LangChain学习

随着人工智能技术的飞速发展,大语言模型(LLM)已经成为了许多应用的核心组件。然而,要充分发挥LLM的能力,我们需要一个强大的框架来管理模型交互、工具调用以及复杂的推理链。这就是LangChain发挥作用的地方。

什么是LangChain?

LangChain是一个专为开发由大型语言模型驱动的应用而设计的框架。它提供了丰富的工具和抽象层,使开发者能够轻松地构建复杂的AI应用,包括聊天机器人、文档分析系统、自动化代理等。
在本文中,将通过一个具体的例子——网页内容抓取与分析系统,来深入了解LangChain的基本概念和使用方法。

实战案例:网页内容分析

项目包含两个核心部分:

  • MCP服务器:负责网页内容的抓取和处理
  • LangChain客户端: 作为智能代理协调整个过程

MCP服务器架构

MCP(Model Context Protocol)服务器是这个系统的底层服务,负责执行具体的网页处理任务。在我们的示例中,它包含了三个核心功能:

  • fetch_html:获取网页的原始HTML内容
  • extract_text:从HTML转给你提取纯文本内容
  • clean_html:清理HTML并提取主要内容

这些功能被包装成工具,供LangChain代理调用。

@mcp.tool()
async def fetch_html(url: str, timeout: int = 10) -> str:
    """
    获取网页原始HTML内容
    :param url: 网页URL
    :param timeout: 请求超时
    :return: 网页HTML内容
    """
    ...

LangChain代理配置

在LangChain客户端中,我们首先需要配置大语言模型。在这个例子中,我们使用了来自硅基流动的Qwen/Qwen3-8B模型:

siliconflow_llm = ChatOpenAI(
    model="Qwen/Qwen3-8B",
    api_key="your-api-key",
    base_url="https://api.siliconflow.cn/v1",
    temperature=0.1,
    max_tokens=2000,
)

接下来,连接到MCP服务器,并获取其暴露的工具:

client = MultiServerMCPClient(
    {
        "web_parser": {
            "command": "python",
            "args": ["./mcp_server.py"],
            "transport": "stdio",
        }
    }
)
tools = await client.get_tools()

构建智能代理

有了LLM和工具之后,我们可以创建一个智能代理来协调它们的工作。这里我们使用了LangChain的create_tool_calling_agent函数:

prompt = ChatPromptTemplate.from_messages([
    ("system", """你是一个精准的网页数据提取助手。
                 请按照以下格式返回数据:
                 {format_instructions}
                 严格按格式返回JSON
                 重要:只返回JSON,不要任何解释文字。"""),
    ("human", "任务:提取网页内容\n目标URL:{url}"),
    ("placeholder", "{agent_scratchpad}"),
])

agent = create_tool_calling_agent(llm=siliconflow_llm, tools=tools, prompt=prompt)

定义了一个结构化的数据模型来规范输出格式:

class News(BaseModel):
    title: str = Field(default="", description="标题")
    content_type: str = Field(default="", description="标签")
    article_type: str = Field(default="其他", description="内容类型, 转载、原贴、评论、其他")
    content: str = Field(default="", description="页面文本")
    # ... 其他字段

最后,创建AgentExecutor来执行代理任务:

agent_executor = AgentExecutor(
    agent=agent,
    tools=tools,
    verbose=True,
    max_iterations=3,
    handle_parsing_errors=True,
    return_intermediate_steps=True,
)

执行流程

当代理执行任务时,经历一下步骤:

  1. 接收用户的指令(例如抓取特定URL的内容)
  2. 分析任务需求并决定需要调用哪些工具
  3. 调用相应的工具
  4. 处理工具返回的结果
  5. 根据需要进一步调用其他工具
  6. 最终将结构化数据返回给用户

关键概念总结

  1. LLM集成:LangChain支持多种大语言模型,包括OpenAI、Anthropic以及其他兼容API的模型。
  2. 工具(Tools):外部函数或服务,可以被代理调用来执行特定任务。在我们的例子中,MCP服务器提供的网页处理功能就是工具。
  3. 提示模板(Prompt Templates):用于构造发送给LLM的提示,确保一致性和可维护性。
  4. 代理(Agents):决策层,决定采取什么行动,调用什么工具,以及如何响应用户。
  5. 执行器(Executors):负责实际执行代理的决策,调用工具并处理结果。

小结

这篇文章介绍了LangChain的基本概念,并通过一个网页内容分析的实际例子展示了如何使用LangChain构建智能应用。如何配置LLM、连接外部工具、构建代理并执行复杂任务。

完整代码

安装依赖

pip install fastmcp beautifulsoup4 langchain-mcp-adapters langchain-openai langchain langchain-community html2text
pip install readability-lxml>=0.8.4.1
# mcp_server.py
import asyncio
from typing import Optional

import aiohttp
from mcp.server.fastmcp import FastMCP
from bs4 import BeautifulSoup
import html2text
from readability import Document

mcp = FastMCP("WebParser")

class SessionManager:
    def __init__(self):
        self.session: Optional[aiohttp.ClientSession] = None

    async def get_session(self) -> aiohttp.ClientSession:
        if self.session is None or self.session.closed:
            timeout = aiohttp.ClientTimeout(total=30)
            self.session = aiohttp.ClientSession(
                timeout=timeout,
                headers={"User-Agent": "Mozilla/5.0 (compatible; LangChainBot/1.0)"},
            )
        return self.session

    async def close(self):
        if self.session and not self.session.closed:
            await self.session.close()

session_mgr = SessionManager()

@mcp.tool()
async def fetch_html(url: str, timeout: int = 10) -> str:
    """
    获取网页原始HTML内容
    :param url: 网页URL
    :param timeout: 请求超时
    :return: 网页HTML内容
    """
    session = await session_mgr.get_session()
    try:
        async with session.get(url, timeout=timeout) as resp:
            resp.raise_for_status()
            # 先获取bytes内容
            content_bytes = await resp.read()
            # 检测编码
            import chardet

            encoding_detected = chardet.detect(content_bytes)["encoding"]
            try:
                if encoding_detected:
                    html_content = content_bytes.decode(
                        encoding_detected, errors="replace"
                    )
                else:
                    html_content = content_bytes.decode("utf-8", errors="replace")
            except (UnicodeDecodeError, LookupError):
                # 如果检测的编码失败,尝试常见编码
                for encoding in ["utf-8", "gbk", "gb2312", "latin-1"]:
                    try:
                        html_content = content_bytes.decode(encoding, errors="replace")
                        break
                    except UnicodeDecodeError:
                        continue
                else:
                    # 所有编码都失败,使用replace模式
                    html_content = content_bytes.decode("utf-8", errors="replace")
            return html_content
    except aiohttp.ClientError as e:
        print(f"获取网页HTML失败: {e}")
        return ""


def _extract_text_sync(html: str, selector: Optional[str]) -> str:
    """同步提取函数"""
    soup = BeautifulSoup(html, "html.parser")
    if selector:
        elements = soup.select(selector)
        return "\n".join(e.get_text(strip=True) for e in elements)
    return soup.body.get_text(strip=True) if soup.body else ""

@mcp.tool()
async def extract_text(html: str, selector: Optional[str] = None) -> str:
    """
    使用css选择器提取纯文本,无selector则提取body全部文本
    :param html: 网页源码
    :param selector: css选择器
    :return: 纯文本
    """
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(None, _extract_text_sync, html, selector)


def _clean_html_sync(html_content: str) -> str:
    """
    结合readability-lxml、BeautifulSoup和html2text来提取和清理HTML内容
    步骤:readability提取正文 -> BeautifulSoup清理 -> html2text转换
    :param html_content: 输入HTML
    :return: 清理后的HTML
    """
    if not html_content:
        return ""

    # 第一步:readability提取正文
    try:
        doc = Document(html_content)
        main_content = doc.summary()
    except Exception as e:
        print(f"Readability提取失败,使用原始HTML结果: {e}")
        main_content = html_content

    # 第二步:BeautifulSoup清理无关元素
    try:
        soup = BeautifulSoup(main_content, "html.parser")
        for tag in soup(
            [
                "script",
                "style",
                "nav",
                "header",
                "footer",
                "aside",
                "advertisement",
                "sidebar",
                "comment",
            ]
        ):
            tag.decompose()
        for tag in soup.find_all():
            if not tag.get_text(strip=True):
                tag.decompose()

        cleaned_html = str(soup)
    except Exception as e:
        print(f"BeautifulSoup清理失败,使用readability结果: {e}")
        cleaned_html = main_content

    # 第三步:使用html2md转换为精简的Markdown
    try:
        h = html2text.HTML2Text()
        h.ignore_links = True
        h.ignore_images = True
        h.ignore_tables = True
        h.ignore_emphasis = True
        h.body_width = 0
        h.single_line_break = True
        h.ul_item_mark = "-"

        markdown_content = h.handle(cleaned_html)
        lines = [line.strip() for line in markdown_content.splitlines() if line.strip()]
        final_content = "\n".join(lines)

        return final_content
    except Exception as e:
        print(f"HTML转markdown失败,返回清理后的html: {e}")
        return cleaned_html


@mcp.tool()
async def clean_html(html_content: str) -> str:
    """异步清洗HTML"""
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(None, _clean_html_sync, html_content)


async def shutdown():
    """MCP服务器关闭时清理资源"""
    await session_mgr.close()


async def main():
    """主函数"""
    await mcp.run_stdio_async()
# scraper_agent.py
import asyncio
from datetime import datetime
from typing import Optional

from langchain_core.output_parsers import PydanticOutputParser
from langchain_core.prompts import ChatPromptTemplate
from pydantic import BaseModel, Field
from langchain_classic.agents import (
    AgentExecutor,
    create_tool_calling_agent,
)
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain_openai import ChatOpenAI


class News(BaseModel):
    title: str = Field(default="", description="标题")
    content_type: str = Field(default="", description="标签")
    article_type: str = Field(
        default="其他", description="内容类型, 转载、原贴、评论、其他"
    )
    content: str = Field(default="", description="页面文本")
    ...

parser = PydanticOutputParser(pydantic_object=News)

async def main():
    # 1. 配置大模型
    siliconflow_llm = ChatOpenAI(
        model="Qwen/Qwen3-8B",  # 模型名称从模型广场获取
        api_key="your_api_key",  # 硅基流动 API Key
        base_url="https://api.siliconflow.cn/v1",
        temperature=0.1,
        max_tokens=2000,
    )
    # 2. 连接MCP服务器(本地stdio模式)
    client = MultiServerMCPClient(
        {
            "web_parser": {
                "command": "python",
                "args": ["./mcp_server.py"],
                "transport": "stdio",
            }
        }
    )
    # 3. 获取MCP工具并创建Agent
    tools = await client.get_tools()
    prompt = ChatPromptTemplate.from_messages(
        [
            (
                "system",
                """你是一个精准的网页数据提取助手。
                    请按照以下格式返回数据:
                    {format_instructions}
                    严格按格式返回JSON
                    重要:只返回JSON,不要任何解释文字。
                """,
            ),
            ("human", "任务:提取网页内容\n目标URL:{url}"),
            ("placeholder", "{agent_scratchpad}"),
        ]
    )
    print("可用工具:", [tool.name for tool in tools])
    # 4. 构建带结构化输出的Agent
    agent = create_tool_calling_agent(llm=siliconflow_llm, tools=tools, prompt=prompt)
    # 5. 定义爬取任务
    target_url = "目标URL"
    # 6. 创建AgentExecutor并执行
    agent_executor = AgentExecutor(
        agent=agent,
        tools=tools,
        verbose=True,
        max_iterations=3,  # 减少迭代次数避免无限循环
        handle_parsing_errors=True,  # 添加错误处理
        return_intermediate_steps=True,  # 返回中间步骤用于调试
    )
    response = await agent_executor.ainvoke(
        {
            "url": target_url,
            "format_instructions": parser.get_format_instructions(),
            "agent_scratchpad": [],
        }
    )
    # 7. 提取最终结果
    final_output = response.get("output", "")
    if final_output:
        try:
            news = parser.parse(final_output)
            print("解析成功")
            print(f"{news.title}")
        except Exception as e:
            print(f"解析失败:{e}")
            print(f"原始输出:{final_output}")
    else:
        print("无有效输出")


if __name__ == "__main__":
    asyncio.run(main())

运行结果:
LangChain提取文章信息

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐