1. 灵感闪现:打造一个 AI 批量翻译官

故事的开头,我雄心勃勃,想用 Gemini AI 打造一个能自动翻译整个项目文档的 Agent。目标很明确:

  • 输入一个 docs 文件夹。
  • 输出一个 docs-zh 文件夹,里面是所有翻译好的中文版 Markdown 文件。
  • 图片等非文本文件,要能自动复制过去。

很快,我的第一个版本诞生了,一个勤勤恳恳的“老实人”脚本。

2. 初版诞生:勤恳的“蜗牛” 🐌

这个脚本逻辑清晰,简单粗暴:

  1. 用一个 for 循环遍历所有文件。
  2. 读一个英文文件。
  3. 向 Gemini API 发起一个翻译请求。
  4. 等啊等… 等到 API 返回结果。
  5. 把翻译好的内容写入新文件。
  6. 主动 sleep(1) 休息一秒,生怕 API 服务器太累。
  7. 回到第 2 步,处理下一个文件。

它还带上了 tqdm 进度条,看起来很专业。但一跑起来,我就发现了问题:40 多个文件,进度条像被施了定身法,慢得令人发指!

3. 灵魂拷问:瓶颈到底在哪?🤔

看着龟速爬行的进度条,我陷入了沉思。这点文件读写,CPU 根本不眨眼。那问题出在哪?很快,我锁定了三大元凶:

  • 元凶一:【网络延迟】
    每次 API 调用,数据都要在我的电脑和谷歌的服务器之间跑一个来回。这段旅程的时间,远比代码执行本身要长得多。在等待的每一毫秒,我的程序都在“摸鱼”。

  • 元凶二:【主动“自残”】
    那个 time.sleep(1),本意是好的,为了防止请求太快被封。但它也意味着,每翻译一个文件,就要强制罚站 1 秒。40 个文件就是 40 秒纯粹的等待!

  • 元凶三(也是罪魁祸首):【串行执行】
    我的脚本是个“一根筋”,它必须等上一个文件从请求->等待->返回->写入的全过程结束后,才开始处理下一个。这就像一个只有一个窗口的办事大厅,效率奇低。

4. 救星登场:asyncio 并发编程 🚀

问题的核心是等待,而不是计算。那么解决方案就是:在等待的时候,去做别的事!

这就是 Python 的异步编程 asyncio 登场的时候了。

它的理念就像一个优秀的厨师:

不会傻傻地盯着一锅汤等它煮沸(等待网络),而是在炖汤的同时,去洗菜、切菜(发起其他网络请求)。

我果断地对脚本进行了“引擎升级”:

  1. 切换到异步函数:把核心的翻译函数 def 改为 async def
  2. 调用异步 API:使用 await model.generate_content_async(),告诉程序“你先去请求,我不等你,我去忙别的”。
  3. 批量下达任务:用 asyncio.gather(*tasks) 把所有文件的翻译任务一次性“扔”进事件循环,让 Python 自己去调度。
  4. 智能限流:抛弃 time.sleep(1),改用 asyncio.Semaphore(10),这就像给办事大厅开了 10 个窗口,同时处理 10 个任务,既保证了效率,又不会因为瞬间请求太多而挤爆服务器。

5. 最终章:从“蜗牛”到“火箭”的蜕变 🎉

新脚本一运行,效果立竿见影!

  • 之前:进度条一格一格地挪,总耗时好几分钟。
  • 现在:进度条“唰”地一下飞速前进,整个过程在几十秒内就完成了!

结论:
这次改造的核心,是认清了任务的本质。当你的程序大部分时间都在等待 I/O(网络请求、文件读写)时,并发就是那把开启效率之门的钥匙。通过 asyncio,我们把程序从一个“单线程的笨蛋”,变成了一个“懂得多任务调度的天才”,实现了质的飞跃。

import os
import time
import shutil
import asyncio  # 引入 asyncio 库
import google.generativeai as genai
from tqdm import tqdm
from tqdm.asyncio import tqdm_asyncio  # 引入tqdm的异步版本
from dotenv import load_dotenv

# --- 配置 ---
load_dotenv()
os.environ["http_proxy"] = "http://127.0.0.1:7899"
os.environ["https_proxy"] = "http://127.0.0.1:7899"

GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")


genai.configure(api_key=GOOGLE_API_KEY)
model = genai.GenerativeModel("gemini-2.5-flash")  # 建议使用 latest 标签

prompt_template = """
你是一名专业的IT技术文档翻译员。
请将下面的英文 Markdown 文本翻译成简体中文。

翻译要求:
1. 完整保留原始的 Markdown 格式。
2. 技术术语需翻译得精准、专业且符合行业习惯 (例如: 'repository' -> '仓库', 'commit' -> '提交', 'pull request' -> '拉取请求')3. 不要翻译代码块 (```...```)、行内代码 (`...`)、URL链接和终端命令。
4. 确保翻译后的语言流畅、专业,适合开发者阅读。
5. 直接返回翻译后的文本内容,不要包含任何额外的解释或前言。

英文原文如下:
---
{english_text}
---
"""


# 将翻译函数改为异步函数
async def translate_text_async(text: str, model: genai.GenerativeModel) -> str:
    prompt = prompt_template.format(english_text=text)
    safety_settings = [
        {"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"},
        {"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_NONE"},
        {"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE"},
        {"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"},
    ]
    # 使用异步版本的 generate_content
    response = await model.generate_content_async(prompt, safety_settings=safety_settings)
    return response.text.strip()


# 定义一个处理单个文件的异步任务
async def process_file_async(input_path, output_dir, model, stats, pbar):
    relative_path = os.path.relpath(input_path, os.path.dirname(os.path.commonprefix([input_path, output_dir])))
    output_path = os.path.join(output_dir, relative_path)
    os.makedirs(os.path.dirname(output_path), exist_ok=True)

    try:
        if input_path.lower().endswith(('.md', '.markdown')):
            with open(input_path, 'r', encoding='utf-8') as f:
                content = f.read()

            if not content.strip():
                translated_content = ""
            else:
                # 等待异步翻译完成
                translated_content = await translate_text_async(content, model)

            with open(output_path, 'w', encoding='utf-8') as f:
                f.write(translated_content)
            stats['success'] += 1
        else:
            shutil.copy2(input_path, output_path)
            stats['copied'] += 1
    except Exception as e:
        stats['failed'] += 1
        stats['failures'].append(f"处理失败: {relative_path}, 原因: {e}")
    finally:
        pbar.update(1)


# 主执行函数
async def main(input_dir: str, output_dir: str):
    files_to_process = []
    for root, _, files in os.walk(input_dir):
        for file in files:
            files_to_process.append(os.path.join(root, file))

    if not files_to_process:
        print(f"在目录 '{input_dir}' 中没有找到任何文件。")
        return

    start_time = time.time()
    stats = {'success': 0, 'failed': 0, 'copied': 0, 'failures': []}

    os.makedirs(output_dir, exist_ok=True)

    print(f"开始并发处理目录 '{input_dir}'...")

    # 创建一个 Semaphore 来控制并发数量,防止瞬间请求过多导致API拒绝
    # 比如,我们限制最多同时有 10 个请求在进行
    CONCURRENT_LIMIT = 10
    semaphore = asyncio.Semaphore(CONCURRENT_LIMIT)

    async def throttled_process_file(input_path, output_dir, model, stats, pbar):
        async with semaphore:
            await process_file_async(input_path, output_dir, model, stats, pbar)
            # 在这里可以加一个非常小的延时,进一步避免触发频率限制
            await asyncio.sleep(0.1)

    with tqdm(total=len(files_to_process), desc="翻译进度", unit="个文件") as pbar:
        # 创建所有文件的处理任务
        tasks = [throttled_process_file(fp, output_dir, model, stats, pbar) for fp in files_to_process]
        # 等待所有任务完成
        await asyncio.gather(*tasks)

    # 打印总结报告
    duration = time.time() - start_time
    print("\n" + "=" * 25 + " 任务总结 " + "=" * 25)
    print(f"| 总耗时: {duration:.2f} 秒")
    print(f"| 总处理文件数: {len(files_to_process)}")
    print(f"| -> 翻译成功: {stats['success']}")
    print(f"| -> 资源文件复制成功: {stats['copied']}")
    print(f"| -> 处理失败: {stats['failed']}")

    if stats['failures']:
        print("\n" + "-" * 25 + " 失败详情 " + "-" * 25)
        for fail_info in stats['failures']:
            print(f"| {fail_info}")

    print("\n翻译任务全部完成!")


if __name__ == "__main__":
    input_directory = r"docs-eng"
    output_directory = "docs-zh-async"

    # 运行异步主函数
    asyncio.run(main(input_directory, output_directory))
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐