从语音到文字:用 OpenAI Whisper 构建智能语音笔记应用的完整指南

关键词

Whisper, 语音识别, 语音笔记, 自然语言处理, Python, API 集成, 实时转录

摘要

在信息爆炸的时代,我们每天都被海量信息包围,如何高效捕捉和整理思想成为一项挑战。语音笔记作为一种自然直观的记录方式,正在改变我们与信息交互的方式。本文深入探讨如何利用 OpenAI 的 Whisper 语音识别模型构建功能强大的语音笔记应用。我们将从基础概念讲起,逐步深入技术原理,提供完整的实现步骤,并探讨实际应用中的挑战与解决方案。无论你是开发新手还是有经验的工程师,这篇指南都将帮助你掌握语音识别技术,并将其应用到实际项目中,打造属于自己的智能语音笔记系统。

1. 背景介绍

1.1 语音笔记的时代需求

想象一下,你正在参加一个重要会议,大脑高速运转吸收信息,同时还要分心记录要点;或者你正在散步时突然迸发灵感,却苦于无法立即记录;又或者你想在通勤途中"写"一篇文章,但双手被方向盘占用。这些场景下,语音笔记成为了理想的解决方案。

语音是人类最自然、最高效的交流方式。研究表明,语音输入的速度可达每分钟 150-160 个单词,而打字速度平均仅为每分钟 40 个单词。这种效率差异使得语音笔记在快节奏的现代生活中具有独特优势。

1.2 传统语音识别的痛点

尽管语音笔记的概念并不新鲜,但传统语音识别技术存在诸多局限:

  • 准确率不足:尤其是在处理专业术语、口音或背景噪音时
  • 延迟明显:实时转录体验不佳
  • 需要联网:无法在没有网络连接的环境下使用
  • 个性化欠缺:难以适应特定用户的语音习惯和专业领域
  • 功能单一:仅限于简单的语音转文字,缺乏深度处理能力

这些痛点限制了语音笔记应用的普及和用户体验。

1.3 Whisper:语音识别的革命性突破

2022 年 9 月,OpenAI 发布了 Whisper——一个开源的通用语音识别模型,彻底改变了语音识别领域的格局。Whisper 不仅在准确率上实现了质的飞跃,还支持多语言识别、语音翻译、说话人识别等多种功能。

与传统语音识别系统相比,Whisper 具有以下显著优势:

  • 更高的准确率:在多种测试集上超越了传统商业系统
  • 多语言支持:原生支持 99 种语言的语音识别
  • 离线可用:可在本地部署,无需持续网络连接
  • 功能丰富:除转录外,还支持翻译、时间戳标记、段落分割等
  • 开源免费:源代码完全开放,可自由使用和修改

这些特性使 Whisper 成为构建语音笔记应用的理想选择。

1.4 本文目标读者

本文主要面向以下读者:

  • 开发者:希望将语音识别功能集成到自己应用中的软件工程师
  • 产品经理:想了解语音笔记应用技术实现的产品负责人
  • 创业者:计划开发基于语音识别的创新产品的创业者
  • 技术爱好者:对 AI 语音技术感兴趣并希望亲手实践的技术爱好者

无论你是语音识别领域的新手,还是有经验的开发者,本文都将为你提供从理论到实践的全面指导。

1.5 阅读收益

阅读本文后,你将能够:

  • 理解 Whisper 模型的工作原理和核心优势
  • 搭建完整的 Whisper 开发环境
  • 实现基础的语音转录功能
  • 构建具有高级功能的语音笔记应用
  • 解决实际部署中可能遇到的挑战
  • 了解语音识别技术的发展趋势和未来方向

2. 核心概念解析

2.1 语音识别的基本原理

语音识别,简单来说,就是让计算机能够"听懂"人类的语言,并将其转换为文字形式。这个过程可以类比为教一个不懂中文的外国人听懂并写下中文对话——计算机需要学习语音的发音规律、词汇、语法,甚至语境。

语音识别系统通常包括以下几个核心步骤:

  1. 音频预处理:清理音频信号,去除噪音,标准化音量等
  2. 特征提取:将原始音频转换为计算机可理解的特征表示
  3. 声学模型:识别音频特征对应的音素(语音的基本单位)
  4. 语言模型:根据语言规律将音素组合成有意义的词语和句子
  5. 后处理:优化识别结果,修正可能的错误

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

2.2 Whisper 模型架构解析

Whisper 采用了一种端到端的深度学习架构,将整个语音识别过程整合到一个统一的模型中。如果把传统语音识别系统比作一条由多个专家组成的生产线,那么 Whisper 就像是一个全能专家,能够独立完成所有任务。

Whisper 的架构主要由两部分组成:

  1. 编码器(Encoder):负责将音频信号转换为特征表示
  2. 解码器(Decoder):负责将特征表示转换为目标文本

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

更具体地说,Whisper 的工作流程如下:

  1. 将音频分割成 30 秒的片段(这是 Whisper 的处理单位)
  2. 将每个片段转换为梅尔频谱图(一种可视化音频频率随时间变化的表示方法)
  3. 编码器处理梅尔频谱图,生成音频特征序列
  4. 解码器接收编码器输出和之前生成的文本 tokens,预测下一个文本 token
  5. 重复步骤 4,直到生成完整的转录文本

这种架构的优势在于:

  • 端到端学习:整个系统从原始音频直接学习到文本输出,避免了传统系统中各组件单独优化带来的局限性
  • 上下文理解:解码器能够利用上下文信息修正识别错误,提高整体准确率
  • 多任务能力:同一架构可支持语音识别、翻译、语言识别等多种任务

2.3 Whisper 模型家族

Whisper 提供了多种不同规模的模型,以满足不同应用场景的需求:

模型大小 参数数量 英文转录速度 英文转录准确率 多语言能力 推荐使用场景
tiny 39M 最快 适中 基础 资源受限设备,实时应用
base 74M 良好 良好 移动应用,对速度有要求的场景
small 244M 中等 优秀 优秀 平衡速度和准确率的通用场景
medium 769M 较慢 非常优秀 非常优秀 对准确率要求高的应用
large 1550M 最慢 最佳 最佳 离线应用,需要最高准确率的场景

选择合适的模型需要权衡以下因素:

  • 准确率要求:重要会议记录可能需要 large 模型,而快速笔记可能 tiny 模型就足够
  • 设备性能:手机等移动设备可能只能运行 small 及以下模型
  • 响应速度:实时转录需要更快的模型
  • 网络状况:离线使用需要本地部署较大模型

2.4 语音笔记应用的核心组件

一个完整的语音笔记应用通常包含以下核心组件:

音频采集模块
音频预处理模块
语音识别模块 - Whisper
文本处理模块
笔记管理模块
用户界面
高级功能模块
  1. 音频采集模块:负责录制或导入音频
  2. 音频预处理模块:优化音频质量,提高识别准确率
  3. 语音识别模块:核心模块,使用 Whisper 将语音转换为文本
  4. 文本处理模块:对识别结果进行格式化、纠错、分段等处理
  5. 笔记管理模块:负责笔记的存储、检索、分类和同步
  6. 高级功能模块:如摘要生成、关键词提取、翻译等
  7. 用户界面:提供直观的交互方式

这些组件协同工作,为用户提供从语音输入到笔记管理的完整体验。

2.5 Whisper 与其他语音识别技术对比

为了更好地理解 Whisper 的优势,我们将其与其他主流语音识别技术进行对比:

特性 Whisper 传统商业API
(如Google Cloud Speech-to-Text)
专用语音助手
(如Siri, Alexa)
离线使用 支持 有限支持 部分支持
自定义能力 高(可微调) 中(通过自定义词汇表)
多语言支持 99种语言 约120种语言 约50种语言
专业领域适应 可通过微调实现 有限
成本 免费(本地部署) 按使用量付费 免费(但有功能限制)
隐私保护 高(本地处理) 低(数据上传) 中(部分本地处理)
转录质量 优秀 优秀 一般(针对命令优化)

对于语音笔记应用而言,Whisper 提供了最佳的平衡:优秀的转录质量、完全的离线能力、高隐私保护和零成本(本地部署时)。

3. 技术原理与实现

3.1 Whisper 的工作原理深度解析

要深入理解 Whisper 的工作原理,我们需要从音频处理开始,逐步了解模型如何将声波转换为文本。

3.1.1 音频信号的数字化表示

声音本质上是空气的振动,是一种模拟信号。计算机处理声音时,首先需要将其转换为数字信号,这个过程称为采样。

想象一条连续的曲线(模拟音频)被我们用无数个点来表示(数字音频),采样率就是我们每秒采集的点数。Whisper 使用 16kHz 的采样率,意味着它每秒对音频信号进行 16,000 次采样。

采样后的音频信号表示为一个数值序列,每个数值代表特定时间点的声音振幅。

3.1.2 梅尔频谱图:声音的视觉表示

原始音频波形对计算机来说仍然难以理解。Whisper 将音频转换为梅尔频谱图(Mel Spectrogram)——一种能更好表示声音特征的视觉化形式。

可以将梅尔频谱图比作声音的"指纹",其中:

  • 横轴表示时间
  • 纵轴表示频率(声音的高低)
  • 颜色深浅表示不同频率声音的强度

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

转换过程使用傅里叶变换,将时域信号转换为频域表示,然后通过梅尔刻度(一种模拟人耳对声音频率感知的非线性刻度)进行滤波。

数学上,梅尔频谱图的计算过程可以表示为:

M(f)=2595log⁡10(1+f700) M(f) = 2595 \log_{10}\left(1 + \frac{f}{700}\right) M(f)=2595log10(1+700f)

其中 M(f)M(f)M(f) 是梅尔频率,fff 是实际频率(Hz)。这个公式模拟了人耳对低频声音更敏感而对高频声音分辨率较低的特性。

3.1.3 Transformer 架构在 Whisper 中的应用

Whisper 基于 Transformer 架构,这是一种使用自注意力机制的深度学习模型。Transformer 最初用于自然语言处理,Whisper 将其成功应用于语音识别。

自注意力机制可以理解为模型在处理每个部分信息时,能够"关注"到输入中最相关的其他部分。就像我们阅读时,会根据上下文理解某个词语的含义,Whisper 也会根据整个音频上下文来"理解"当前正在处理的声音片段。

Whisper 的编码器将梅尔频谱图转换为隐藏状态序列,解码器则将这些隐藏状态转换为文本。编码器和解码器都使用了多头自注意力机制,使模型能够捕捉音频中的长距离依赖关系。

3.1.4 条件生成与任务自适应

Whisper 的一个独特之处是它能够通过"提示"(prompt)来执行不同的语音处理任务。这是通过在解码器输入中添加特殊的任务指令实现的。

例如:

  • 要进行英语转录,模型会收到类似 “<|en|><|transcribe|><|notimestamps|>” 的指令
  • 要将中文语音翻译成英文,模型会收到类似 “<|zh|><|translate|><|notimestamps|>” 的指令

这种设计使单个模型能够灵活适应多种任务,大大增强了其通用性。

3.2 环境搭建与基础配置

在开始实现之前,我们需要搭建合适的开发环境。以下是详细步骤:

3.2.1 Python 环境准备

Whisper 基于 Python 开发,推荐使用 Python 3.8 或更高版本。我们首先创建一个虚拟环境:

# 创建虚拟环境
python -m venv whisper-env

# 激活虚拟环境
# Windows
whisper-env\Scripts\activate
# macOS/Linux
source whisper-env/bin/activate

# 更新pip
pip install --upgrade pip
3.2.2 安装 Whisper

OpenAI 提供了官方的 Whisper Python 包,可以通过 pip 直接安装:

# 基础安装
pip install openai-whisper

# 如需支持GPU加速(推荐)
pip install openai-whisper[torch]

# 如需从源码安装(获取最新功能)
pip install git+https://github.com/openai/whisper.git
3.2.3 安装依赖项

Whisper 需要一些额外的依赖项,特别是用于音频处理的 ffmpeg:

# Ubuntu/Debian
sudo apt update && sudo apt install ffmpeg

# macOS (使用Homebrew)
brew install ffmpeg

# Windows (使用Chocolatey)
choco install ffmpeg
3.2.4 验证安装

安装完成后,我们可以通过以下命令验证:

# 检查Whisper版本
whisper --version

# 运行简单的转录测试
whisper --model tiny "https://freesound.org/data/previews/347/347069_6157011-lq.mp3"

如果一切正常,你应该能看到音频文件的转录结果。

3.2.5 GPU 加速配置(可选但推荐)

Whisper 支持 GPU 加速,这可以显著提高转录速度。要启用 GPU 支持,需要安装 CUDA 兼容的 PyTorch:

# 安装支持CUDA的PyTorch
pip3 install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118

验证 GPU 是否可用:

import torch
print(torch.cuda.is_available())  # 应输出 True

3.3 Whisper 基础 API 使用

Whisper 提供了简洁易用的 API,让开发者可以快速集成语音识别功能。以下是基础用法详解:

3.3.1 命令行接口 (CLI)

Whisper 安装后提供了一个命令行工具,可以直接用于转录音频文件:

# 基本用法
whisper audio_file.mp3

# 指定模型(默认为small)
whisper audio_file.mp3 --model medium

# 指定语言(自动检测时可能不准确)
whisper audio_file.mp3 --language Chinese

# 输出多种格式(文本、SRT、VTT等)
whisper audio_file.mp3 --output_format txt --output_format srt

# 指定输出目录
whisper audio_file.mp3 --output_dir ./transcriptions

这对于快速测试和批量处理音频文件非常有用。

3.3.2 Python API

对于应用开发,我们主要使用 Whisper 的 Python API:

import whisper

# 加载模型
model = whisper.load_model("base")

# 转录音频文件
result = model.transcribe("audio_file.mp3")

# 输出转录文本
print(result["text"])

这段简单的代码就能完成从音频到文本的转换。result 对象包含丰富的信息:

# 完整结果结构
{
  "text": "完整的转录文本...",
  "segments": [
    {
      "id": 0,  # 段落ID
      "seek": 0,  # 音频中的起始位置(毫秒)
      "start": 0.0,  # 段落开始时间(秒)
      "end": 3.0,  # 段落结束时间(秒)
      "text": "第一段文本...",  # 段落文本
      "tokens": [...],  # 文本对应的tokens
      "temperature": 0.0,  # 采样温度
      "avg_logprob": -0.45,  # 平均对数概率(可信度指标)
      "compression_ratio": 1.0,  # 压缩比
      "no_speech_prob": 0.01  # 无语音概率
    },
    # 更多段落...
  ],
  "language": "zh"  # 检测到的语言
}
3.3.3 高级参数配置

Whisper 提供了多种参数来调整转录行为:

result = model.transcribe(
    "audio_file.mp3",
    language="zh",  # 指定语言,None表示自动检测
    temperature=0.5,  # 采样温度,0表示确定性输出,值越高随机性越大
    word_timestamps=True,  # 是否生成单词级别的时间戳
    initial_prompt="这是一段技术讲座的录音,内容涉及人工智能和机器学习。",  # 初始提示,帮助模型理解上下文
    condition_on_previous_text=True,  # 是否使用前面的文本作为上下文
    fp16=False  # 是否使用FP16精度(CPU环境设为False)
)

这些参数的合理配置可以显著提高特定场景下的转录质量。

3.3.4 音频流处理

对于实时应用,我们需要处理音频流而不是文件。以下是一个简单的音频流处理示例:

import numpy as np
import whisper
import sounddevice as sd

# 配置
model = whisper.load_model("base")
sample_rate = 16000  # Whisper要求的采样率
chunk_duration = 3  # 每3秒处理一次
chunk_samples = sample_rate * chunk_duration

# 音频回调函数
def audio_callback(indata, frames, time, status):
    if status:
        print(f"音频状态: {status}", file=sys.stderr)
    
    # 将音频数据转换为Whisper所需格式
    audio = np.squeeze(indata)
    
    # 转录当前音频块
    result = model.transcribe(
        audio,
        language="zh",
        temperature=0.0,
        without_timestamps=True,  # 对于实时流,不需要时间戳
        initial_prompt=previous_text  # 使用之前的文本作为上下文
    )
    
    # 保存当前文本用于下一次提示
    global previous_text
    previous_text += result["text"] + " "
    print(result["text"], end=" ", flush=True)

# 初始化
previous_text = ""

# 开始录音和转录
print("开始实时转录... (按Ctrl+C停止)")
with sd.InputStream(
    samplerate=sample_rate, 
    channels=1, 
    dtype=np.float32,
    blocksize=chunk_samples,
    callback=audio_callback
):
    while True:
        time.sleep(1)

这段代码使用 sounddevice 库捕获音频流,并每 3 秒处理一次,实现基本的实时转录功能。

3.4 自定义模型与性能优化

对于生产环境的语音笔记应用,我们可能需要对 Whisper 进行自定义和优化。

3.4.1 模型微调基础

虽然 Whisper 已经在大规模数据集上进行了训练,但针对特定领域(如医学、法律、技术术语)进行微调可以进一步提高准确率。

微调的基本步骤:

  1. 准备数据集:创建包含音频文件和对应文本转录的数据集
  2. 数据预处理:将音频转换为梅尔频谱图,文本转换为 tokens
  3. 配置训练参数:学习率、批次大小、训练轮数等
  4. 执行微调:使用自定义数据继续训练模型
  5. 评估和调整:在验证集上评估性能,调整参数

以下是使用 Hugging Face Transformers 库微调 Whisper 的简化示例:

from datasets import load_dataset
from transformers import WhisperForConditionalGeneration, WhisperProcessor
import torch

# 加载数据集
dataset = load_dataset("json", data_files={"train": "train.json", "validation": "valid.json"})

# 加载预处理器和模型
processor = WhisperProcessor.from_pretrained("openai/whisper-base", language="zh", task="transcribe")
model = WhisperForConditionalGeneration.from_pretrained("openai/whisper-base")

# 数据预处理函数
def preprocess_function(examples):
    audio = examples["audio"]
    text = examples["transcription"]
    
    # 处理音频
    input_features = processor(audio["array"], sampling_rate=audio["sampling_rate"], return_tensors="pt").input_features[0]
    
    # 处理文本
    labels = processor(text=text).input_ids
    
    return {"input_features": input_features, "labels": labels}

# 应用预处理
processed_dataset = dataset.map(preprocess_function)

# 准备训练参数
from transformers import Seq2SeqTrainingArguments, Seq2SeqTrainer

training_args = Seq2SeqTrainingArguments(
    output_dir="./whisper-finetuned",
    per_device_train_batch_size=16,
    learning_rate=1e-5,
    num_train_epochs=10,
    logging_dir="./logs",
    logging_steps=10,
    evaluation_strategy="epoch",
    save_strategy="epoch",
    fp16=True,  # 如果有GPU支持
)

# 初始化训练器
trainer = Seq2SeqTrainer(
    args=training_args,
    model=model,
    train_dataset=processed_dataset["train"],
    eval_dataset=processed_dataset["validation"],
    tokenizer=processor.feature_extractor,
)

# 开始微调
trainer.train()

注意:微调需要大量数据和计算资源,通常需要 GPU 支持。

3.4.2 模型量化与优化

对于资源受限的设备(如手机或嵌入式设备),我们可以对模型进行量化以减小体积和提高速度:

# 加载量化模型(INT8量化)
import torch
model = whisper.load_model("base", device="cpu")
quantized_model = torch.quantization.quantize_dynamic(
    model, {torch.nn.Linear}, dtype=torch.qint8
)

# 保存量化模型
torch.save(quantized_model.state_dict(), "whisper-base-quantized.pt")

Whisper 还支持多种推理优化技术:

# 使用TorchScript优化推理
scripted_model = torch.jit.script(model)

# 使用ONNX格式(需要额外安装onnx和onnxruntime)
import torch.onnx
torch.onnx.export(model, input_sample, "whisper.onnx", opset_version=12)

这些优化可以显著提高模型在边缘设备上的运行速度,同时减少内存占用。

3.4.3 服务化部署

对于多用户应用,我们可以将 Whisper 部署为 API 服务:

from fastapi import FastAPI, UploadFile, File
import whisper
import uvicorn

app = FastAPI(title="Whisper ASR API")

# 加载模型(启动时加载一次)
model = whisper.load_model("base")

@app.post("/transcribe")
async def transcribe_audio(file: UploadFile = File(...)):
    # 保存上传的文件
    with open("temp_audio.mp3", "wb") as f:
        f.write(await file.read())
    
    # 转录音频
    result = model.transcribe("temp_audio.mp3")
    
    # 返回结果
    return {
        "text": result["text"],
        "segments": result["segments"]
    }

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

启动服务后,可以通过 HTTP 请求使用语音识别功能:

curl -X POST "http://localhost:8000/transcribe" -H "accept: application/json" -H "Content-Type: multipart/form-data" -F "file=@audio_file.mp3"

对于生产环境,还需要考虑添加身份验证、请求限制、错误处理等功能。

4. 实际应用:构建完整的语音笔记应用

4.1 应用架构设计

一个功能完善的语音笔记应用需要合理的架构设计。我们将采用模块化设计,确保系统的可扩展性和可维护性。

4.1.1 系统架构概览
用户界面层
业务逻辑层
数据访问层
核心服务层
Whisper语音识别服务
NLP处理服务
音频处理服务
本地数据库
云存储服务
  • 用户界面层:提供直观的用户交互界面
  • 业务逻辑层:处理应用核心业务规则和流程
  • 数据访问层:负责数据的存储和检索
  • 核心服务层:封装关键技术服务
  • 外部服务集成:与云服务、第三方API等集成
4.1.2 数据流设计

语音笔记应用的典型数据流如下:

用户 UI 音频服务 Whisper服务 NLP服务 数据库 录制/导入音频 处理音频(降噪,格式转换) 语音转文字 文本增强(分段,纠错) 显示转录结果 编辑/确认笔记 保存笔记 存储笔记(文本+音频) 用户 UI 音频服务 Whisper服务 NLP服务 数据库
4.1.3 技术栈选择

基于项目需求,我们推荐以下技术栈:

  • 前端

    • 桌面应用:Electron + React/Vue
    • 移动应用:React Native 或 Flutter
    • Web应用:React/Vue + TypeScript
  • 后端

    • 本地应用:Python + SQLite
    • 服务器应用:FastAPI/Node.js + PostgreSQL
  • 核心技术

    • 语音识别:Whisper
    • 音频处理:FFmpeg, Librosa
    • NLP处理:NLTK, SpaCy, Transformers
    • 同步服务:WebDAV, Firebase

4.2 核心功能实现

接下来,我们将详细实现语音笔记应用的核心功能。我们将以 Python 作为主要开发语言,构建一个桌面端应用。

4.2.1 音频录制模块

我们使用 sounddevicesoundfile 库实现音频录制功能:

import sounddevice as sd
import soundfile as sf
import numpy as np
import threading
import time
from datetime import datetime

class AudioRecorder:
    def __init__(self, samplerate=16000, channels=1):
        self.samplerate = samplerate
        self.channels = channels
        self.recording = False
        self.audio_data = []
        self.stream = None
        self.filename = None
    
    def start_recording(self):
        """开始录音"""
        self.recording = True
        self.audio_data = []
        
        # 生成默认文件名(基于当前时间)
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        self.filename = f"recording_{timestamp}.wav"
        
        # 创建音频流
        self.stream = sd.InputStream(
            samplerate=self.samplerate,
            channels=self.channels,
            dtype='float32',
            callback=self._audio_callback
        )
        
        # 开始流
        self.stream.start()
        print(f"开始录音...")
    
    def _audio_callback(self, indata, frames, time, status):
        """音频流回调函数"""
        if status:
            print(f"音频状态: {status}", file=sys.stderr)
        
        if self.recording:
            self.audio_data.append(indata.copy())
    
    def stop_recording(self):
        """停止录音并保存文件"""
        if not self.recording:
            return None
            
        self.recording = False
        
        # 停止流
        if self.stream:
            self.stream.stop()
            self.stream.close()
        
        # 合并音频数据
        audio = np.concatenate(self.audio_data, axis=0)
        
        # 保存为WAV文件
        sf.write(self.filename, audio, self.samplerate)
        print(f"录音已保存至: {self.filename}")
        
        return self.filename
    
    def is_recording(self):
        """检查是否正在录音"""
        return self.recording

# 使用示例
if __name__ == "__main__":
    recorder = AudioRecorder()
    
    # 开始录音
    recorder.start_recording()
    
    # 模拟录音5秒
    try:
        for i in range(5):
            print(f"录音中... {5 - i}秒")
            time.sleep(1)
    finally:
        # 停止录音
        filename = recorder.stop_recording()
        print(f"录音文件: {filename}")
4.2.2 语音转录服务

实现一个封装 Whisper 的转录服务,支持不同模型和参数配置:

import whisper
import torch
import os
from typing import Dict, Optional, List

class TranscriptionService:
    def __init__(self, model_name: str = "base", device: Optional[str] = None):
        """
        初始化转录服务
        
        Args:
            model_name: Whisper模型名称 (tiny, base, small, medium, large)
            device: 运行设备 (cuda, cpu, mps等),None表示自动选择
        """
        self.model_name = model_name
        self.device = device or ("cuda" if torch.cuda.is_available() else "cpu")
        self.model = None
        self.load_model()
        
        # 支持的语言列表
        self.supported_languages = {
            "en": "英语",
            "zh": "中文",
            "ja": "日语",
            "ko": "韩语",
            "fr": "法语",
            "de": "德语",
            # 更多语言...
        }
    
    def load_model(self):
        """加载Whisper模型"""
        print(f"加载模型: {self.model_name} (设备: {self.device})")
        self.model = whisper.load_model(self.model_name, device=self.device)
    
    def transcribe_audio(self, audio_path: str, **kwargs) -> Dict:
        """
        转录音频文件
        
        Args:
            audio_path: 音频文件路径
            **kwargs: 传递给Whisper的额外参数
            
        Returns:
            包含转录结果的字典
        """
        if not os.path.exists(audio_path):
            raise FileNotFoundError(f"音频文件不存在: {audio_path}")
            
        if self.model is None:
            self.load_model()
            
        print(f"开始转录: {audio_path}")
        result = self.model.transcribe(audio_path, **kwargs)
        print(f"转录完成: {audio_path}")
        
        return result
    
    def transcribe_audio_segment(self, audio_data: np.ndarray, **kwargs) -> Dict:
        """
        转录音频数据片段
        
        Args:
            audio_data: 音频数据数组 (1D numpy array)
            **kwargs: 传递给Whisper的额外参数
            
        Returns:
            包含转录结果的字典
        """
        if self.model is None:
            self.load_model()
            
        # 确保音频是单声道
        if len(audio_data.shape) > 1:
            audio_data = np.mean(audio_data, axis=1)
            
        print(f"开始转录音频片段 (长度: {len(audio_data)/16000:.2f}秒)")
        result = self.model.transcribe(audio_data, **kwargs)
        
        return result
    
    def change_model(self, model_name: str):
        """切换不同大小的模型"""
        if model_name != self.model_name:
            self.model_name = model_name
            self.model = None  # 释放旧模型内存
            self.load_model()
    
    def get_available_models(self) -> List[str]:
        """获取可用的模型列表"""
        return ["tiny", "base", "small", "medium", "large"]
    
    def get_supported_languages(self) -> Dict[str, str]:
        """获取支持的语言列表"""
        return self.supported_languages

# 使用示例
if __name__ == "__main__":
    # 创建转录服务实例
    transcriber = TranscriptionService(model_name="base")
    
    # 转录音频文件
    result = transcriber.transcribe_audio(
        "recording_20230701_120000.wav",
        language="zh",
        word_timestamps=True
    )
    
    # 打印结果
    print(f"转录文本: {result['text']}")
    print("\n段落:")
    for segment in result["segments"]:
        print(f"[{segment['start']:.2f}s - {segment['end']:.2f}s]: {segment['text']}")
4.2.3 笔记管理系统

实现笔记的存储、检索和管理功能:

import sqlite3
import os
import json
from datetime import datetime
from typing import List, Dict, Optional, Tuple

class NoteManager:
    def __init__(self, db_path: str = "voice_notes.db"):
        """初始化笔记管理器"""
        self.db_path = db_path
        self._init_database()
    
    def _init_database(self):
        """初始化数据库表结构"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # 创建笔记表
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS notes (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            title TEXT NOT NULL,
            content TEXT NOT NULL,
            audio_path TEXT,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            language TEXT DEFAULT 'zh',
            duration REAL,  # 音频时长(秒)
            tags TEXT,  # 标签,用逗号分隔
            is_favorite BOOLEAN DEFAULT 0
        )
        ''')
        
        # 创建索引
        cursor.execute('CREATE INDEX IF NOT EXISTS idx_notes_created_at ON notes(created_at)')
        cursor.execute('CREATE INDEX IF NOT EXISTS idx_notes_tags ON notes(tags)')
        
        conn.commit()
        conn.close()
    
    def create_note(self, title: str, content: str, audio_path: Optional[str] = None, 
                   language: str = 'zh', duration: Optional[float] = None, 
                   tags: Optional[List[str]] = None) -> int:
        """
        创建新笔记
        
        Args:
            title: 笔记标题
            content: 笔记内容
            audio_path: 音频文件路径
            language: 语言代码
            duration: 音频时长(秒)
            tags: 标签列表
            
        Returns:
            新创建笔记的ID
        """
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # 处理标签
        tags_str = ','.join(tags) if tags else ''
        
        # 插入笔记
        cursor.execute('''
        INSERT INTO notes (title, content, audio_path, language, duration, tags, created_at, updated_at)
        VALUES (?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
        ''', (title, content, audio_path, language, duration, tags_str))
        
        note_id = cursor.lastrowid
        conn.commit()
        conn.close()
        
        return note_id
    
    def get_note(self, note_id: int) -> Optional[Dict]:
        """
        获取笔记详情
        
        Args:
            note_id: 笔记ID
            
        Returns:
            笔记信息字典,或None如果不存在
        """
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('SELECT * FROM notes WHERE id = ?', (note_id,))
        row = cursor.fetchone()
        
        conn.close()
        
        if not row:
            return None
            
        # 解析结果
        note = {
            'id': row[0],
            'title': row[1],
            'content': row[2],
            'audio_path': row[3],
            'created_at': row[4],
            'updated_at': row[5],
            'language': row[6],
            'duration': row[7],
            'tags': row[8].split(',') if row[8] else [],
            'is_favorite': bool(row[9])
        }
        
        return note
    
    def update_note(self, note_id: int, **kwargs) -> bool:
        """
        更新笔记
        
        Args:
            note_id: 笔记ID
            **kwargs: 要更新的字段
            
        Returns:
            更新是否成功
        """
        if not kwargs:
            return True  # 没有要更新的字段
        
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # 处理标签
        if 'tags' in kwargs:
            kwargs['tags'] = ','.join(kwargs['tags']) if kwargs['tags'] else ''
        
        # 添加更新时间
        kwargs['updated_at'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        
        # 构建更新SQL
        fields = ', '.join([f"{k} = ?" for k in kwargs.keys()])
        values = list(kwargs.values()) + [note_id]
        
        cursor.execute(f'UPDATE notes SET {fields} WHERE id = ?', values)
        affected = cursor.rowcount
        
        conn.commit()
        conn.close()
        
        return affected > 0
    
    def delete_note(self, note_id: int) -> bool:
        """
        删除笔记
        
        Args:
            note_id: 笔记ID
            
        Returns:
            删除是否成功
        """
        # 先获取笔记信息,以便删除关联的音频文件
        note = self.get_note(note_id)
        if not note:
            return False
            
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('DELETE FROM notes WHERE id = ?', (note_id,))
        affected = cursor.rowcount
        
        conn.commit()
        conn.close()
        
        # 删除关联的音频文件
        if note['audio_path'] and os.path.exists(note['audio_path']):
            try:
                os.remove(note['audio_path'])
                print(f"已删除音频文件: {note['audio_path']}")
            except Exception as e:
                print(f"删除音频文件失败: {e}")
        
        return affected > 0
    
    def list_notes(self, limit: int = 100, offset: int = 0, 
                  sort_by: str = 'created_at', order: str = 'DESC',
                  tag_filter: Optional[str] = None) -> List[Dict]:
        """
        列出笔记
        
        Args:
            limit: 最大返回数量
            offset: 偏移量(用于分页)
            sort_by: 排序字段
            order: 排序方向 (ASC/DESC)
            tag_filter: 按标签筛选
            
        Returns:
            笔记列表
        """
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # 验证排序字段
        valid_sort_fields = ['created_at', 'updated_at', 'title', 'duration']
        if sort_by not in valid_sort_fields:
            sort_by = 'created_at'
            
        # 验证排序方向
        order = order.upper()
        if order not in ['ASC', 'DESC']:
            order = 'DESC'
            
        # 构建查询条件
        where_clause = ''
        params = []
        
        if tag_filter:
            where_clause = 'WHERE tags LIKE ?'
            params.append(f'%{tag_filter}%')
        
        # 执行查询
        query = f'''
        SELECT id, title, created_at, updated_at, duration, tags, is_favorite 
        FROM notes {where_clause} 
        ORDER BY {sort_by} {order} 
        LIMIT ? OFFSET ?
        '''
        
        params.extend([limit, offset])
        cursor.execute(query, params)
        rows = cursor.fetchall()
        
        conn.close()
        
        # 格式化结果
        notes = []
        for row in rows:
            notes.append({
                'id': row[0],
                'title': row[1],
                'created_at': row[2],
                'updated_at': row[3],
                'duration': row[4],
                'tags': row[5].split(',') if row[5] else [],
                'is_favorite': bool(row[6])
            })
            
        return notes
    
    def search_notes(self, query: str) -> List[Dict]:
        """
        搜索笔记
        
        Args:
            query: 搜索关键词
            
        Returns:
            匹配的笔记列表
        """
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # 在标题和内容中搜索
        cursor.execute('''
        SELECT id, title, content, created_at 
        FROM notes 
        WHERE title LIKE ? OR content LIKE ? 
        ORDER BY created_at DESC
        ''', (f'%{query}%', f'%{query}%'))
        
        rows = cursor.fetchall()
        conn.close()
        
        notes = []
        for row in rows:
            # 截断内容预览
            preview = row[2][:100] + '...' if len(row[2]) > 100 else row[2]
            
            notes.append({
                'id': row[0],
                'title': row[1],
                'preview': preview,
                'created_at': row[2]
            })
            
        return notes

# 使用示例
if __name__ == "__main__":
    note_manager = NoteManager()
    
    # 创建新笔记
    note_id = note_manager.create_note(
        title="技术会议笔记",
        content="今天讨论了Whisper模型在语音笔记应用中的实现方案...",
        audio_path="recording_20230701_120000.wav",
        duration=320.5,
        tags=["会议", "技术", "AI"]
    )
    print(f"创建新笔记,ID: {note_id}")
    
    # 获取笔记
    note = note_manager.get_note(note_id)
    print(f"获取笔记: {note['title']}")
    
    # 更新笔记
    note_manager.update_note(note_id, title="Whisper技术会议笔记", is_favorite=1)
    
    # 列出笔记
    notes = note_manager.list_notes(limit=10)
    print(f"共有 {len(notes)} 条笔记")
    
    # 
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐