当前位置: 首页 > news >正文

通过语音指令,搜索手机通讯录联系人,自动拨号,方便视障用户打电话。

视障人士语音拨号助手

一、实际应用场景与痛点

应用场景

视障用户张先生需要经常联系家人和朋友。传统的通讯录操作对视障人士极不友好:触摸屏无法提供有效反馈,输入号码容易出错,查找联系人需要别人帮助。一次紧急情况下,他急需拨打急救电话,但无法快速找到联系人,耽误了宝贵时间。他急需一款能够通过语音指令快速拨打电话的智能助手。

核心痛点

1. 屏幕触摸困难:触摸屏缺乏触觉反馈,无法准确点击

2. 联系人查找慢:滚动查找联系人效率低下

3. 号码输入易错:语音输入号码容易识别错误

4. 紧急情况无助:紧急时无法快速拨打关键联系人

5. 隐私泄露风险:让他人帮助拨号泄露个人隐私

6. 操作步骤复杂:多步操作对视障用户不友好

二、核心逻辑设计

1. 启动语音监听,等待唤醒词

2. 识别用户语音指令

3. 解析指令意图(拨号/搜索/紧急呼叫)

4. 搜索通讯录匹配联系人

5. 语音确认联系人信息

6. 自动拨打确认的联系人

7. 通话状态监控和提示

8. 通话记录和常用联系人管理

三、模块化代码实现

主程序文件:voice_dialer_assistant.py

#!/usr/bin/env python3

# -*- coding: utf-8 -*-

"""

视障人士语音拨号助手

通过语音指令搜索通讯录并自动拨号

版本:2.0.0

作者:无障碍智能助手

"""

import os

import sys

import json

import time

import sqlite3

import threading

import queue

import re

from datetime import datetime

from typing import Dict, List, Tuple, Optional, Any, Union

from dataclasses import dataclass, asdict, field

from enum import Enum

import warnings

warnings.filterwarnings('ignore')

# 语音识别

try:

import speech_recognition as sr

SPEECH_RECOGNITION_AVAILABLE = True

except ImportError:

SPEECH_RECOGNITION_AVAILABLE = False

print("警告: speech_recognition未安装,语音识别功能受限")

# 语音合成

try:

import pyttsx3

TTS_AVAILABLE = True

except ImportError:

TTS_AVAILABLE = False

print("警告: pyttsx3未安装,语音合成功能受限")

# 音频处理

try:

import pyaudio

import wave

import simpleaudio as sa

AUDIO_AVAILABLE = True

except ImportError:

AUDIO_AVAILABLE = False

# Android相关(如果运行在Android上)

try:

from android import Android

ANDROID_AVAILABLE = True

except ImportError:

ANDROID_AVAILABLE = False

# 系统调用(用于拨号)

import subprocess

import webbrowser

class CallType(Enum):

"""通话类型枚举"""

OUTGOING = "outgoing" # 拨出电话

INCOMING = "incoming" # 来电

MISSED = "missed" # 未接来电

REJECTED = "rejected" # 拒接

EMERGENCY = "emergency" # 紧急呼叫

class CommandType(Enum):

"""语音指令类型枚举"""

DIAL = "dial" # 拨打电话

SEARCH = "search" # 搜索联系人

EMERGENCY = "emergency" # 紧急呼叫

REDIAL = "redial" # 重拨

CANCEL = "cancel" # 取消操作

HELP = "help" # 帮助

LIST_CONTACTS = "list" # 列出联系人

ADD_CONTACT = "add" # 添加联系人

DELETE_CONTACT = "delete" # 删除联系人

UNKNOWN = "unknown" # 未知指令

@dataclass

class Contact:

"""联系人信息类"""

id: int

name: str

phone_numbers: List[str] # 可能有多个号码

groups: List[str] # 分组标签

notes: Optional[str] = None

frequency: int = 0 # 呼叫频率

last_called: Optional[float] = None

created_at: float = field(default_factory=time.time)

def to_dict(self) -> Dict:

"""转换为字典"""

return {

'id': self.id,

'name': self.name,

'phone_numbers': self.phone_numbers,

'groups': self.groups,

'notes': self.notes,

'frequency': self.frequency,

'last_called': datetime.fromtimestamp(self.last_called).isoformat()

if self.last_called else None,

'created_at': datetime.fromtimestamp(self.created_at).isoformat()

}

@dataclass

class CallRecord:

"""通话记录类"""

id: int

contact_id: Optional[int]

contact_name: str

phone_number: str

call_type: CallType

timestamp: float

duration: float = 0.0 # 通话时长(秒)

successful: bool = True

def to_dict(self) -> Dict:

"""转换为字典"""

return {

'id': self.id,

'contact_id': self.contact_id,

'contact_name': self.contact_name,

'phone_number': self.phone_number,

'call_type': self.call_type.value,

'timestamp': datetime.fromtimestamp(self.timestamp).isoformat(),

'duration': self.duration,

'successful': self.successful

}

@dataclass

class VoiceCommand:

"""语音指令类"""

raw_text: str

command_type: CommandType

parameters: Dict[str, Any]

confidence: float

timestamp: float = field(default_factory=time.time)

def to_dict(self) -> Dict:

"""转换为字典"""

return {

'raw_text': self.raw_text,

'command_type': self.command_type.value,

'parameters': self.parameters,

'confidence': self.confidence,

'timestamp': datetime.fromtimestamp(self.timestamp).isoformat()

}

class ContactDatabase:

"""联系人数据库管理器"""

def __init__(self, db_path: str = "data/contacts.db"):

"""

初始化数据库

Args:

db_path: 数据库文件路径

"""

self.db_path = db_path

self.connection = None

self.setup_database()

def setup_database(self):

"""设置数据库"""

# 确保数据目录存在

os.makedirs(os.path.dirname(self.db_path), exist_ok=True)

# 连接数据库

self.connection = sqlite3.connect(self.db_path)

self.connection.row_factory = sqlite3.Row

# 创建表

self._create_tables()

# 插入示例数据(如果数据库为空)

self._insert_sample_data()

def _create_tables(self):

"""创建数据表"""

cursor = self.connection.cursor()

# 联系人表

cursor.execute('''

CREATE TABLE IF NOT EXISTS contacts (

id INTEGER PRIMARY KEY AUTOINCREMENT,

name TEXT NOT NULL,

phone_numbers TEXT NOT NULL, -- JSON数组

groups TEXT NOT NULL, -- JSON数组

notes TEXT,

frequency INTEGER DEFAULT 0,

last_called REAL,

created_at REAL NOT NULL

)

''')

# 通话记录表

cursor.execute('''

CREATE TABLE IF NOT EXISTS call_records (

id INTEGER PRIMARY KEY AUTOINCREMENT,

contact_id INTEGER,

contact_name TEXT NOT NULL,

phone_number TEXT NOT NULL,

call_type TEXT NOT NULL,

timestamp REAL NOT NULL,

duration REAL DEFAULT 0,

successful BOOLEAN DEFAULT TRUE,

FOREIGN KEY (contact_id) REFERENCES contacts (id)

)

''')

# 语音指令记录表

cursor.execute('''

CREATE TABLE IF NOT EXISTS voice_commands (

id INTEGER PRIMARY KEY AUTOINCREMENT,

raw_text TEXT NOT NULL,

command_type TEXT NOT NULL,

parameters TEXT NOT NULL, -- JSON对象

confidence REAL NOT NULL,

timestamp REAL NOT NULL

)

''')

# 紧急联系人表

cursor.execute('''

CREATE TABLE IF NOT EXISTS emergency_contacts (

id INTEGER PRIMARY KEY AUTOINCREMENT,

contact_id INTEGER NOT NULL,

emergency_type TEXT NOT NULL, -- police/fire/ambulance/family/doctor

priority INTEGER DEFAULT 1,

FOREIGN KEY (contact_id) REFERENCES contacts (id)

)

''')

self.connection.commit()

def _insert_sample_data(self):

"""插入示例数据"""

cursor = self.connection.cursor()

# 检查是否有数据

cursor.execute("SELECT COUNT(*) FROM contacts")

count = cursor.fetchone()[0]

if count == 0:

# 添加示例联系人

sample_contacts = [

("张三", ["13800138000"], ["家人"], "父亲", 10),

("李四", ["13900139000"], ["朋友"], "好友", 5),

("王五", ["13600136000"], ["同事"], "同事", 3),

("急救中心", ["120"], ["紧急"], "医疗急救", 0),

("报警电话", ["110"], ["紧急"], "警察", 0),

("火警电话", ["119"], ["紧急"], "消防", 0)

]

for name, phones, groups, notes, freq in sample_contacts:

self.add_contact(

name=name,

phone_numbers=phones,

groups=groups,

notes=notes,

frequency=freq

)

# 添加紧急联系人

emergency_mapping = [

("急救中心", "ambulance", 1),

("报警电话", "police", 1),

("火警电话", "fire", 1),

("张三", "family", 2)

]

for contact_name, etype, priority in emergency_mapping:

cursor.execute("SELECT id FROM contacts WHERE name = ?", (contact_name,))

result = cursor.fetchone()

if result:

self.add_emergency_contact(result[0], etype, priority)

self.connection.commit()

print("已添加示例联系人数据")

def add_contact(self, name: str, phone_numbers: List[str],

groups: List[str] = None, notes: str = None,

frequency: int = 0) -> int:

"""

添加联系人

Args:

name: 联系人姓名

phone_numbers: 电话号码列表

groups: 分组列表

notes: 备注

frequency: 呼叫频率

Returns:

联系人ID

"""

if groups is None:

groups = []

cursor = self.connection.cursor()

cursor.execute('''

INSERT INTO contacts

(name, phone_numbers, groups, notes, frequency, created_at)

VALUES (?, ?, ?, ?, ?, ?)

''', (

name,

json.dumps(phone_numbers, ensure_ascii=False),

json.dumps(groups, ensure_ascii=False),

notes,

frequency,

time.time()

))

contact_id = cursor.lastrowid

self.connection.commit()

return contact_id

def get_contact(self, contact_id: int) -> Optional[Contact]:

"""获取联系人"""

cursor = self.connection.cursor()

cursor.execute("SELECT * FROM contacts WHERE id = ?", (contact_id,))

row = cursor.fetchone()

if row:

return self._row_to_contact(row)

return None

def search_contacts(self, keyword: str, limit: int = 10) -> List[Contact]:

"""

搜索联系人

Args:

keyword: 搜索关键词

limit: 返回结果数量限制

Returns:

匹配的联系人列表

"""

cursor = self.connection.cursor()

# 使用SQLite的全文搜索(如果支持)或LIKE搜索

if keyword:

query = '''

SELECT * FROM contacts

WHERE name LIKE ? OR phone_numbers LIKE ? OR groups LIKE ?

ORDER BY frequency DESC, last_called DESC

LIMIT ?

'''

like_keyword = f"%{keyword}%"

cursor.execute(query, (like_keyword, like_keyword, like_keyword, limit))

else:

# 如果没有关键词,返回最常联系的人

query = '''

SELECT * FROM contacts

ORDER BY frequency DESC, last_called DESC

LIMIT ?

'''

cursor.execute(query, (limit,))

rows = cursor.fetchall()

return [self._row_to_contact(row) for row in rows]

def _row_to_contact(self, row) -> Contact:

"""将数据库行转换为Contact对象"""

return Contact(

id=row['id'],

name=row['name'],

phone_numbers=json.loads(row['phone_numbers']),

groups=json.loads(row['groups']),

notes=row['notes'],

frequency=row['frequency'],

last_called=row['last_called'],

created_at=row['created_at']

)

def update_contact_frequency(self, contact_id: int):

"""更新联系人呼叫频率"""

cursor = self.connection.cursor()

cursor.execute('''

UPDATE contacts

SET frequency = frequency + 1, last_called = ?

WHERE id = ?

''', (time.time(), contact_id))

self.connection.commit()

def add_call_record(self, contact_id: Optional[int], contact_name: str,

phone_number: str, call_type: CallType,

duration: float = 0.0, successful: bool = True) -> int:

"""

添加通话记录

Returns:

记录ID

"""

cursor = self.connection.cursor()

cursor.execute('''

INSERT INTO call_records

(contact_id, contact_name, phone_number, call_type,

timestamp, duration, successful)

VALUES (?, ?, ?, ?, ?, ?, ?)

''', (

contact_id,

contact_name,

phone_number,

call_type.value,

time.time(),

duration,

successful

))

record_id = cursor.lastrowid

self.connection.commit()

return record_id

def get_recent_calls(self, limit: int = 20) -> List[CallRecord]:

"""获取最近通话记录"""

cursor = self.connection.cursor()

cursor.execute('''

SELECT * FROM call_records

ORDER BY timestamp DESC

LIMIT ?

''', (limit,))

rows = cursor.fetchall()

records = []

for row in rows:

records.append(CallRecord(

id=row['id'],

contact_id=row['contact_id'],

contact_name=row['contact_name'],

phone_number=row['phone_number'],

call_type=CallType(row['call_type']),

timestamp=row['timestamp'],

duration=row['duration'],

successful=bool(row['successful'])

))

return records

def add_emergency_contact(self, contact_id: int,

emergency_type: str, priority: int = 1):

"""添加紧急联系人"""

cursor = self.connection.cursor()

cursor.execute('''

INSERT OR REPLACE INTO emergency_contacts

(contact_id, emergency_type, priority)

VALUES (?, ?, ?)

''', (contact_id, emergency_type, priority))

self.connection.commit()

def get_emergency_contacts(self, emergency_type: str = None) -> List[Contact]:

"""获取紧急联系人"""

cursor = self.connection.cursor()

if emergency_type:

query = '''

SELECT c.* FROM contacts c

JOIN emergency_contacts ec ON c.id = ec.contact_id

WHERE ec.emergency_type = ?

ORDER BY ec.priority ASC

'''

cursor.execute(query, (emergency_type,))

else:

query = '''

SELECT c.* FROM contacts c

JOIN emergency_contacts ec ON c.id = ec.contact_id

ORDER BY ec.emergency_type, ec.priority ASC

'''

cursor.execute(query)

rows = cursor.fetchall()

return [self._row_to_contact(row) for row in rows]

def log_voice_command(self, command: VoiceCommand):

"""记录语音指令"""

cursor = self.connection.cursor()

cursor.execute('''

INSERT INTO voice_commands

(raw_text, command_type, parameters, confidence, timestamp)

VALUES (?, ?, ?, ?, ?)

''', (

command.raw_text,

command.command_type.value,

json.dumps(command.parameters, ensure_ascii=False),

command.confidence,

command.timestamp

))

self.connection.commit()

def close(self):

"""关闭数据库连接"""

if self.connection:

self.connection.close()

class SpeechRecognizer:

"""语音识别器"""

def __init__(self, config: Dict):

"""

初始化语音识别器

Args:

config: 识别器配置

"""

self.config = config

self.recognizer = None

self.microphone = None

if SPEECH_RECOGNITION_AVAILABLE:

self._initialize_recognizer()

def _initialize_recognizer(self):

"""初始化语音识别组件"""

try:

self.recognizer = sr.Recognizer()

self.microphone = sr.Microphone()

# 调整环境噪声

with self.microphone as source:

print("正在调整环境噪声,请保持安静...")

self.recognizer.adjust_for_ambient_noise(source, duration=1)

print("语音识别器初始化成功")

except Exception as e:

print(f"语音识别器初始化失败: {e}")

self.recognizer = None

self.microphone = None

def listen_for_wake_word(self, wake_word: str = "小助手",

timeout: float = None, phrase_time_limit: float = 2) -> bool:

"""

监听唤醒词

Args:

wake_word: 唤醒词

timeout: 超时时间(秒)

phrase_time_limit: 短语时长限制

Returns:

是否检测到唤醒词

"""

if not self.recognizer or not self.microphone:

return False

try:

with self.microphone as source:

print(f"正在监听唤醒词: '{wake_word}'...")

audio = self.recognizer.listen(source, timeout=timeout,

phrase_time_limit=phrase_time_limit)

# 识别语音

try:

text = self.recognizer.recognize_google(audio, language='zh-CN')

print(f"识别到语音: {text}")

# 检查是否包含唤醒词

if wake_word.lower() in text.lower():

print(f"检测到唤醒词: {wake_word}")

return True

except sr.UnknownValueError:

print("无法识别语音")

except sr.RequestError as e:

print(f"语音识别服务错误: {e}")

except sr.WaitTimeoutError:

pass # 超时是正常的

return False

def listen_for_command(self, timeout: float = 5,

phrase_time_limit: float = 5) -> Optional[str]:

"""

监听语音指令

Args:

timeout: 超时时间

phrase_time_limit: 短语时长限制

Returns:

识别的文本,失败返回None

"""

if not self.recognizer or not self.microphone:

return None

try:

print("请说出指令...")

with self.microphone as source:

audio = self.recognizer.listen(source, timeout=timeout,

phrase_time_limit=phrase_time_limit)

# 识别语音

try:

text = self.recognizer.recognize_google(audio, language='zh-CN')

print(f"识别到指令: {text}")

return text

except sr.UnknownValueError:

print("无法识别语音指令")

return None

except sr.RequestError as e:

print(f"语音识别服务错误: {e}")

return None

except sr.WaitTimeoutError:

print("等待指令超时")

return None

def continuous_listen(self, callback, stop_event: threading.Event):

"""

持续监听语音

Args:

callback: 识别到语音时的回调函数

stop_event: 停止事件

"""

if not self.recognizer or not self.microphone:

return

print("开始持续语音监听...")

while not stop_event.is_set():

try:

text = self.listen_for_command(timeout=1)

if text and callback:

callback(text)

except Exception as e:

print(f"监听错误: {e}")

time.sleep(0.1)

class VoiceSynthesizer:

"""语音合成器"""

def __init__(self, config: Dict):

"""

初始化语音合成器

Args:

config: 合成器配置

"""

self.config = config

self.tts_engine = None

if TTS_AVAILABLE:

self._initialize_tts()

def _initialize_tts(self):

"""初始化TTS引擎"""

try:

self.tts_engine = pyttsx3.init()

# 设置语音属性

rate = self.config.get('speech_r

如果你觉得这个工具好用,欢迎关注我!

http://www.jsqmd.com/news/205651/

相关文章:

  • Z-Image-Turbo在广告素材批量生成中的应用实例
  • 基于Java的婴儿游泳馆智慧管理系统的设计与实现全方位解析:附毕设论文+源代码
  • Windows快捷键冲突终极排查指南:热键侦探实战手册
  • 在qt中使用ZH-44043d采集器
  • 玻璃贴膜哪家好?2026精选屏幕保护膜厂家以及车窗膜品牌推荐分析 - 栗子测评
  • AVIF格式Photoshop插件完整使用指南:快速实现高效图像压缩与HDR处理
  • MPh革命性突破:Python驱动COMSOL实现智能化仿真工作流
  • 2026年青海政采云产品上传机构排行:政采云商品上传实力机构有哪些? - 工业品牌热点
  • 主流支付宝消费券回收方式全解析 - 京顺回收
  • 3分钟搞定Figma中文界面:设计师必备的终极本地化方案
  • 让OneNote变身专业Markdown编辑器的完整指南
  • DM数据库物理存储结构深度解析与理论实践
  • Z-Image-Edit自然语言编辑能力边界探索
  • Z-Image-Base模型性能瓶颈分析:哪些环节最耗资源?
  • 2026年度圆锯机品牌商推荐供应商排行榜,节能型圆锯机供应商新测评精选 - mypinpai
  • 扫路车专业厂家优质之选,程力专汽实力领航 - myqiye
  • 3分钟搞定Android Studio中文界面:新手必备的完整汉化指南
  • 【JPCS出版 | EI检索】第五届能源利用与自动化国际学术会议(ICEUA 2026)
  • 2026年蝶阀市场新观察:哪些厂家表现亮眼?蝶阀/半球阀/三通球阀/气动调节阀/冶金阀门/调节阀,蝶阀工厂哪家强 - 品牌推荐师
  • Coze AI Agent“智能体”工作流搭建全解析:一篇文章让你彻底明白!
  • AI智能体应用架构全解析:从用户输入到生成回复,揭秘12个关键步骤与核心组件!
  • 【程序员必看】VSCode后台智能体隔离技术:让编辑器提速300%
  • ZoteroTheme插件终极美化指南:深度定制文献管理界面
  • Z-Image-ComfyUI插件生态系统构想:第三方扩展支持
  • 2026年微信立减金回收回收平台大盘点 - 淘淘收小程序
  • 国内六轴数控穿孔机主流厂家全解析(附评分与联系方式) - 品牌推荐大师
  • configure: WARNING: unrecognized options: --with-mysql
  • Android Studio中文界面终极指南:3分钟实现全中文开发环境
  • 执医考试通关攻略:精选资料助力高效备考,医考生必看! - 品牌测评鉴赏家
  • 2026内衬涂塑钢管厂家新选:球墨铸铁内衬塑钢管厂家技术对比 - 栗子测评