说明:

随着技术的发展,RNN 已经逐渐被结构更灵活、计算效率更高的transformer取代,后者已经成为当前自然语言处理的主流方法。

尽管如此,RNN 仍然具有重要的学习价值。它所体现的“循环建模上下文”的思想,
不仅为 LSTM 和 GRU 等改进模型奠定了基础,也有助于我们更好地理解 Transformer 等
更复杂的架构

一、项目说明

本次项目我们分为以下文件模块:

本案例旨在实现一个用于手机输入法的智能词语联想模型。

具体需求为:根据用户当前已输入的文本内容,预测下一个可能输入的词语,要求返回概率最高的 5 个候选词供用户选择。

二、项目分析

(一)  数据集处理

1. 任务数据需求

模型要根据用户已输入文本预测下一个词语,因此训练数据需具备自然语言上下文连续性贴近真实使用场景的特点。

2. 可选数据来源

  • 用户真实输入内容:如聊天记录、搜索历史、输入法日志等。这类数据能精准反映真实输入场景,帮助模型学习用户输入习惯和上下文联想模式。
  • 开放领域对话语料:如论坛回复、社交平台评论、闲聊对话等。这类语料口语化特征强,可提升模型在真实输入场景中的泛化能力。

3. 所用数据集

本任务使用的数据集为:https://huggingface.co/datasets/Jax-dan/HundredCV-Chat

4. 数据处理流程

为构造 “下一词预测” 的训练样本,需先对原始语料分词,再通过滑动窗口提取连续的上下文片段,以每个窗口的 “下一个词” 作为预测目标,最终构成 “输入 - 输出对”。

(二) 项目结构设计

1. 模型结构(三部分组成)

层级 功能解读
嵌入层(Embedding) 将输入的词或字索引映射为稠密向量表示,为后续神经网络处理提供可计算的语义载体。
循环神经网络层(RNN) 建模输入序列的上下文信息,输出最后一个时间步的隐藏状态,以此捕捉序列的上下文特征。
输出层(Linear) 将隐藏状态映射到与词表大小一致的维度,生成对下一个词的概率预测,完成 “下一词预测” 任务。

2. 训练方案

损失函数:采用 CrossEntropyLoss。因 “下一词预测” 本质是多分类问题,该损失函数结合了 softmax(将输出转化为概率分布)和交叉熵(衡量预测与真实标签的差异),可有效指导模型学习。

优化器:使用 Adam 优化器。其具备较强的收敛能力和稳定性,能在训练过程中高效更新模型参数。

三、项目实现

本次项目分为如下文件夹:

可以点击下面的链接下载本项目所需要的数据集

通过网盘分享的文件:synthesized_.jsonl
链接: https://pan.baidu.com/s/1ufpAX5K18ndgvzKa0dDIbA 提取码: x452

数据集存放地址是上图文件夹当中的raw!!!

环境配置:

pytorch 深度学习框架,用于模型的构建、训练与推理,是实现神经网络模型的核心工具。
jieba 高效中文分词工具,对原始中文文本进行分词预处理,为后续 NLP 任务提供基础词单元。
gensim 用于训练词向量模型(如 Word2Vec、FastText),助力模型理解词语语义关系。
datasets Hugging Face 提供的数据处理库,实现大规模数据集的高效加载和预处理。
tqdm 显示进度条,实时监控训练与数据处理进度,提升任务执行的可感知性。
Jupyter Notebook

交互式开发环境,用于编写、测试和可视化模型代码与实验过程,支持代码分步执行和结果即时反馈。

等环境配置好了,就可以实现如下代码,具体注释都在代码当中!

1. config超参数配置

# config.py

from pathlib import Path

ROOT_DIR = Path(__file__).parent.parent

RAW_DATA_DIR = ROOT_DIR / "data" / "raw"
PROCESSED_DATA_DIR = ROOT_DIR / "data" / "processed"
LOGS_DIR = ROOT_DIR / "logs"
MODELS_DIR = ROOT_DIR / "models"

SEQ_LEN = 5
BATCH_SIZE = 64
EMBEDDING_DIM = 128
HIDDEN_SIZE = 256
LEARNING_RATE = 1e-3
EPOCHS = 10

2. 数据集预处理

# process.py

# 本模块负责将原始数据进行清洗、分词、编码与划分,最终生成模型可以直接读取的标准格式数据集,并保存到jsonl文件当中
import jieba
# 最后的输出分别是 input target
import pandas as pd
import numpy as np
from pathlib import Path

from sympy.codegen.fnodes import cmplx
from tqdm import tqdm


from sklearn.model_selection import train_test_split
from transformers.data.metrics.squad_metrics import compute_predictions_log_probs

import config

def build_dataset(sentences,word2index):
    """
    把分词、编码后的句子,转换成 “输入 - 目标” 对
    :param sentences: 要处理的句子的列表
    :param word2index: 把词转换成数字
    :return:
    """
    # jieba.lcut(sentence) 把句子进行分词,然后通过word2index把句子转化为词向量,如果词不在列表里,就返回0
    indexed_train_sentences = [[word2index.get(token, 0) for token in jieba.lcut(sentence)]
                               for sentence in sentences]

    # 使用滑动窗口整理数据
    dataset = []

    # 整理成input和token的格式
    # [{'input':[1,2,3,4,5],'target':6},{'input':[2,3,4,5,6},'target':7]
    for sentence in tqdm(indexed_train_sentences,desc="构建数据集"):
        for i in range(len(sentence) - config.SEQ_LEN):
            input = sentence[i:i + config.SEQ_LEN]
            target = sentence[i + config.SEQ_LEN]
            dataset.append({'input': input, 'target': target})
    return dataset


def process():
    print("开始处理数据")
    # 1.读取文件
    # json文件读取的时候取决于你的json文件是什么格式的 注意orient的参数
    # 第一个传的参数可以是json字符串 也可以是一个“相对路径” 这里需要注意是两个点
    # 但是系统会自动率先识别是一个json字符串 其次再认为是一个路径
    # 如何动态识别出“绝对路径” 让脚本在不同的环境都能被识别出来
    # 使用Path来解决
    df = pd.read_json(config.RAW_DATA_DIR / "synthesized_.jsonl",lines = True,orient = 'records')
    
    # df = pd.read_json(config.RAW_DATA_DIR / "synthesized_.jsonl",lines = True,orient = 'records').sample(frac = 0.1)

    # 2.提取句子
    sentences = []
    # json字符串里面的数组在python里面对应的是列表,所以这里转换的是列表
    # 这里是为了去掉说话人!
    for dialog in df['dialog']:
        for sentence in dialog:
            sentences.append(sentence.split(':')[1])
    print(sentences[0:10])
    print(f'句子总数{len(sentences)}')

    # 3.划分数据集,将句子分为测试集和训练集
    train_sentences,test_sentences = train_test_split(sentences,test_size = 0.2)

    # 4.构建词表
    # 把训练集循环切词
    vocab_set = set()
    # 查看进程
    for sentence in tqdm(train_sentences,desc = "构建词表"):
        vocab_set.update(jieba.lcut(sentence))

    # 在词表前面需要加上<unk>特殊字符,但是set是无序性,所以我们要先转化成为列表再加入、
    # 给模型没见过的词一个 “统一的代号” :万能替补
    vocab_list = ['<unk>'] + list(vocab_set)
    print(f'词表大小:{len(vocab_list)}')

    # 5.保存词表
    # 把词表保存到文件里,格式为文本文件,一行一个词
    with open(config.MODELS_DIR / 'vocab.txt', 'w',encoding = 'utf-8') as f:
        f.write('\n'.join(vocab_list))


    # 6.构建训练集
    # word2index 和 index2word 是 NLP 中 “词 - 索引” 双向转换的核心工具,是文本从 “人类可理解” 到 “模型可处理” 再到 “人类可理解” 的关键环节
    word2index = {word:index for index,word in enumerate(vocab_list)}
    train_dataset = build_dataset(train_sentences,word2index)


    # 7.保存训练集,保存成为json格式
    pd.DataFrame(train_dataset).to_json(config.PROCESSED_DATA_DIR / "train.jsonl",lines=True,orient = 'records')

    # 8.构建测试集
    test_dataset = build_dataset(test_sentences,word2index)

    # 9.保存测试集
    pd.DataFrame(test_dataset).to_json(config.PROCESSED_DATA_DIR / "test.jsonl",lines=True,orient = 'records')


    print("数据处理完成")

if __name__ == '__main__':
    process()

3. 自定义数据集

# dataset.py

# 1.定义Dataset
# 模型训练时需要按批次读取数据,这个文件负责把process.py生成的train.jsonl/test.jsonl转换成模型能直接用的格式:
import pandas as pd
from torch.utils.data import Dataset, DataLoader
import torch
from pathlib import Path
import config

# 只需要传入不同的路径就可以读到test.jsonl和train.jsonl
# process步骤里把语句转化成了词向量,存入了jsonl文件当中,但是对于python而言,这些的格式是列表,pytorch只认识tensor
# 这就是__getitem__需要做的事
class InputMethodDataset(Dataset):

    def __init__(self,path): # 初始化
        self.data = pd.read_json(path,lines = True,orient = 'records').to_dict(orient = 'records')

    def __len__(self): # 告诉模型有多少个样本
        return len(self.data)

    def __getitem__(self, index): # 按照索引取货
        input_tensor = torch.tensor(self.data[index]['input'],dtype = torch.long)
        target_tensor = torch.tensor(self.data[index]['target'],dtype = torch.long)
        return input_tensor, target_tensor



# 2.提供一个获取dataloader的方法
def get_dataloader(train=True):
    path = config.PROCESSED_DATA_DIR / ('train.jsonl' if train else 'test.jsonl')
    dataset = InputMethodDataset(path)
    # 实现并行计算!!!
    # shuffle会在每个epoch之前打乱顺序
    return DataLoader(dataset,batch_size = config.BATCH_SIZE,shuffle = True)


if __name__ == '__main__':
    train_dataloader = get_dataloader()
    test_dataloader = get_dataloader(train=False)
    print(len(train_dataloader))
    print(len(test_dataloader))

    # input_tensor,target_tensor的形状是什么样子的?
    # input_tensor.shape : [batch_size,seq_len]
    # target_tensor.shape : [batch_size]
    for input_tensor, target_tensor in train_dataloader:
        print(input_tensor.shape) #[64,5]
        print(target_tensor.shape) # [64]
        break

# Q : 为什么input_tensor和target_tensor的形状不一样?
# A : input_tensor.shape : [batch_size,seq_len],target_tensor.shape : [batch_size]
# 因为需要使用滑动窗口(5)来预测下一个词

4. 定义模型

# model.py

from torch import nn
import config

# embedding -> RNN -> linear
class InputMethodModel(nn.Module):

    def __init__(self,vocab_size):
        super().__init__()
        # 嵌入层 把离散的词转化为连续的向量
        # num_embeddings : 词汇表大小
        # embedding_dim : 词向量维度
        self.embedding = nn.Embedding(num_embeddings=vocab_size, # 词表大小
                                      embedding_dim=config.EMBEDDING_DIM) # 每个向量的维度
        # input_size : 词向量的维度
        # hidden_size : 隐藏状态维度,一般比input_size要大
        self.rnn = nn.RNN(input_size=config.EMBEDDING_DIM,
                          hidden_size = config.HIDDEN_SIZE,
                          num_layers=1,
                          batch_first=True)

        # in_features : RNN最后一个时间步的隐藏状态
        # out_features : 最终要映射到词表那么大的维度上进行选词
        self.linear = nn.Linear(in_features=config.HIDDEN_SIZE,
                                out_features=vocab_size)

        # x的形状是什么?
        # x.shape : [batch_size,seq_len]
    # forward : 数据在模型中的流动逻辑
    def forward(self, x):
        # embed.shape : [batch_size,seq_len,embedding_dim]
        embed = self.embedding(x)
        # output里面有rnn最后一层每一步的时间状态 hn只有最后一步的隐藏状态,但是包含每一层每一个方向(这里不用,直接_)
        # output.shape : [batch.size,seq_len,hidden_size]
        output,_ = self.rnn(embed)
        # last_hidden_size.shape : [batch_size,hidden_size]
        last_hidden_state = output[:,-1,:]
        # output.shape.shape : [batch_size,vocal_size]
        output = self.linear(last_hidden_state)
        return output

5. 模型训练

# train.py

import time
import torch
from torch.utils.data import dataloader
from tqdm import tqdm
from torch.utils.tensorboard import SummaryWriter
from dataset import get_dataloader
from model import InputMethodModel
import config

def train_one_epoch(model,dataloader,loss_fn,optimizer,device):
    """
    :param model: 模型
    :param dataloader: 数据集
    :param loss_fn: 损失函数
    :param optimizer: 优化器
    :param device: 设备
    :return: 当前epoch的平均loss
    """

    model.train()
    total_loss = 0
    for inputs,targets in tqdm(dataloader,desc="train"):
        inputs = inputs.to(device)
        targets = targets.to(device)
        # inputs.shape : [batch_size,seq_len]
        # targets.shape : [batch_size]

        # 前向传播
        # 预测结果
        outputs = model(inputs)

        loss = loss_fn(outputs, targets)

        # 反向传播
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        total_loss += loss.item()

    return total_loss / len(dataloader)


def train():
    # 1.确认设备
    device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")

    # 2.数据集
    # dataset 当中定义的 get_dataloader方法
    dataloader = get_dataloader()

    # 3.词表
    with open(config.MODELS_DIR / 'vocab.txt', 'r',encoding='utf-8') as f:
        vocab_list = [line.strip() for line in f.readlines()]

    # 4.模型
    model = InputMethodModel(vocab_size = len(vocab_list)).to(device)

    # 5.损失函数
    loss_fn = torch.nn.CrossEntropyLoss()

    # 6.优化器
    optimizer = torch.optim.Adam(model.parameters(),lr = config.LEARNING_RATE)

    # 7. tensorboard writer
    writer = SummaryWriter(log_dir = config.LOGS_DIR/time.strftime("%Y-%m-%d_%H-%M-%S"))

    best_loss = float('inf')
    # 开始训练
    for epoch in range(1,1 + config.EPOCHS):
        print("=" * 10,f" EPOCH {epoch} ",'=' * 10)
        # 训练一个epoch逻辑
        loss = train_one_epoch(model, dataloader,loss_fn,optimizer , device)
        print(f'loss: {loss}')

        # 记录训练结果
        writer.add_scalar('loss',loss,epoch)

        # 保存模型
        if loss < best_loss:
            best_loss = loss
            torch.save(model.state_dict(),config.MODELS_DIR / 'best.pth')
            print("model saved")

    writer.close()


if __name__ == '__main__':
        train()

6. 模型预测

# predit.py

import torch
import config
import jieba
from model import InputMethodModel

def predict_batch(model,inputs):
    """
    批量预测
    :param model: 模型
    :param inputs: 输入 shape:[batch_size,seq_len]
    :return: 预测结果 shape:[batch_size,5]
    """

    # 5.预测逻辑
    model.eval()
    with torch.no_grad():
        output = model(inputs)
        # output.shape : [batch_size,vocab_size]
    top5_indexes = torch.topk(output, 5).indices
    # top5_indexes.shape : [batch_size,5]

    top5_indexes_list = top5_indexes.tolist()
    return top5_indexes_list


def predict(text,model,word2index,index2word,device):

    # 4.处理输入
    tokens = jieba.lcut(text)
    indexes = [word2index.get(tokens,0) for tokens in tokens]
    input_tensor = torch.tensor([indexes],dtype = torch.long)
    input_tensor = input_tensor.to(device)

    # 5.预测逻辑
    top5_indexes_list = predict_batch(model,input_tensor)
    top5_tokens = [index2word[index] for index in top5_indexes_list[0]]
    return top5_tokens


def run_predict():

    # 准备各种资源
    # 1.设备
    device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")

    # 2.词表
    with open(config.MODELS_DIR / 'vocab.txt', 'r',encoding='utf-8') as f:
        vocab_list = [line.strip() for line in f.readlines()]

    word2index = {word: index for index, word in enumerate(vocab_list)}
    index2word = {index: word  for index, word in enumerate(vocab_list)}

    # 3.模型
    model = InputMethodModel(vocab_size = len(vocab_list)).to(device)
    model.load_state_dict(torch.load(config.MODELS_DIR / 'best.pth'))

    print("欢迎使用输入法模型(输入q或者quit退出)")
    input_history = ''
    while True:
        user_input = input("> ")
        if user_input in ['q','quit']:
            print("bye")
            break
        if user_input.strip() == '':
            print("请输入内容")
            continue

        input_history += user_input
        print(f'输入历史:{input_history}')
        top5_tokens = predict(input_history,model,word2index,index2word,device)
        print(f'预测结果:{top5_tokens}')




if __name__ == '__main__':
    run_predict()

7. 效果评估

# evaluate.py

import torch
import config
from model import InputMethodModel
from dataset import get_dataloader
from predict import predict_batch

def evaluate(model,test_dataloader,device):

    top1_acc_count = 0
    top5_acc_count = 0
    total_count = 0

    # with torch.no_grad():
    for inputs,targets in test_dataloader:
        inputs = inputs.to(device)
        # inputs.shape : [batch_size,seq_len]
        targets = targets.tolist()
        # targets.shape : [batch_size]
        top5_indexes_list = predict_batch(model,inputs)

        for targets,top5_indexes in zip(targets,top5_indexes_list):
            total_count += 1
            if targets == top5_indexes[0]:
                top1_acc_count += 1
            if targets in top5_indexes:
                top5_acc_count += 1

    return top1_acc_count/total_count, top5_acc_count/total_count


def run_evaluate():
    # 准备资源
    # 1.设备
    device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")

    # 2.词表
    with open(config.MODELS_DIR / 'vocab.txt', 'r',encoding='utf-8') as f:
        vocab_list = [line.strip() for line in f.readlines()]
    print("词表加载成功")

    # 3.模型
    model = InputMethodModel(vocab_size = len(vocab_list)).to(device)
    model.load_state_dict(torch.load(config.MODELS_DIR / 'best.pth'))
    print("模型加载成功")

    # 4.数据集
    test_dataloader = get_dataloader(train = False)

    # 5.评估逻辑
    top1_acc,top5_acc = evaluate(model,test_dataloader,device)
    print("评估结果")
    print(f"top1_acc:{top1_acc}")
    print(f"top5_acc:{top5_acc}")


if __name__ == "__main__":
    run_evaluate()

四、项目反思

尽管循环神经网络(RNN)在处理序列数据方面具有天然优势,

但它在实际应用中面临一个非常严重的问题:长期依赖建模困难。

这指的是:在训练过程中,当输入序列很长时,模型难以有效学习早期输入对最终输出的影响。

上述问题的根本原因在于训练过程中存在的梯度消失或梯度爆炸问题。

在训练 RNN 时,采用的是时间反向传播(Backpropagation Through Time, BPTT)方法,在反向传播过程中,梯度需要在每个时间步上不断链式传递,这就导致模型只能学到短期依赖,而无法学到长期依赖。

另外,若𝑊ℎ大于 1(大到𝑡𝑎𝑛ℎ′ (𝑢𝑡) ∙ 𝑊ℎ大于 1),那么经过𝑡𝑎𝑛ℎ′ (𝑢𝑡) ∙ 𝑊ℎ的多次连乘,
早期路径值就会指数级增长,这个现象称为梯度爆炸,梯度爆炸又会使得参数更新极不稳定。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐