LangChain源码分析之JSONLoader
本文分析了LangChain中JSONLoader的源码。LangChain的JSONLoader通过集成强大的jq查询语言,实现了对JSON数据的灵活、高效解析。它采用预编译和懒加载策略优化性能,并能智能处理不同的文件格式。其设计精妙之处在于提供了一个关键开关,让内容提取方式在简单键值访问与复杂表达式查询间切换,完美平衡了易用性与灵活性。此外,它还支持通过自定义函数来丰富元数据,并结合早期验证机
一.jq_schema(前置学习)
1.jq的初印象
要想去更好地理解LangCahin中JSONLoader的原理,我们得首先介绍jq。
在我看来,jq类似于JSON数据的sed或awk,是一个轻量级的、灵活的命令行 JSON 处理器。
import jq
# python中导包
2.jq的核心概念
jq 的表达式(我们称之为 jq_schema)就像一个管道,数据从左边流入,经过一系列处理,最终从右边流出。
a.基本过滤器:. (点)
# 输入
{"name": "John", "age": 30}
# jq 表达式
.
# 输出
{"name": "John", "age": 30}
b.对象值访问:.key
# 输入
{"name": "John", "age": 30}
# jq 表达式
.name
# 输出
"John"
c.数组元素访问:.[] 和 .[index]
# 输入
{"users": ["Alice", "Bob", "Charlie"]}
# jq 表达式 1: 展开数组
.users[]
# 输出 1
"Alice"
"Bob"
"Charlie"
# jq 表达式 2: 获取第二个元素
.users[1]
# 输出 2
"Bob"
d.jq操作符:|
这个地方很类似LangChain中管道符号|,它将前一个命令的输出作为后一个命令的输入。
比如以下的json格式
{
"quiz": {
"sport": {
"q1": { "question": "Which one is correct team name in NBA?" }
}
}
}
.quiz | .sport | .q1 | .question
# 逻辑流程:
# 1. .quiz -> 获取 quiz 对象
# 2. | .sport -> 将 quiz 对象传入,获取 sport 对象
# 3. | .q1 -> 将 sport 对象传入,获取 q1 对象
# 4. | .question -> 将 q1 对象传入,获取 question 的值
# 输出
"Which one is correct team name in NBA?"
如果对jq模式感兴趣的话,可以去以下两个地方详细学习。
- 官方手册: https://jqlang.github.io/jq/manual/ (最权威、最全面的资料)
- 在线练习场: https://jqplay.org/ (你可以在网页上输入 JSON 和
jq表达式,实时看到结果,非常适合学习!)
二.JSONLoader源码
建议先自己看一下源码,如果能掌握就可以跳过这篇文章了,不明白的地方再跳转到相应方法实现中看一下我的注释和说明。
import json
from os import PathLike
from pathlib import Path
from typing import Any, Callable, Dict, Iterator, Optional, Union
from langchain_core.documents import Document
from langchain_community.document_loaders.base import BaseLoader
class JSONLoader(BaseLoader):
"""
Load a `JSON` file using a `jq` schema.
Setup:
.. code-block:: bash
pip install -U jq
Instantiate:
.. code-block:: python
from langchain_community.document_loaders import JSONLoader
import json
from pathlib import Path
file_path='./sample_quiz.json'
data = json.loads(Path(file_path).read_text())
loader = JSONLoader(
file_path=file_path,
jq_schema='.quiz',
text_content=False)
Load:
.. code-block:: python
docs = loader.load()
print(docs[0].page_content[:100])
print(docs[0].metadata)
.. code-block:: python
{"sport": {"q1": {"question": "Which one is correct team name in
NBA?", "options": ["New York Bulls"
{'source': '/sample_quiz
.json', 'seq_num': 1}
Async load:
.. code-block:: python
docs = await loader.aload()
print(docs[0].page_content[:100])
print(docs[0].metadata)
.. code-block:: python
{"sport": {"q1": {"question": "Which one is correct team name in
NBA?", "options": ["New York Bulls"
{'source': '/sample_quizg
.json', 'seq_num': 1}
Lazy load:
.. code-block:: python
docs = []
docs_lazy = loader.lazy_load()
# async variant:
# docs_lazy = await loader.alazy_load()
for doc in docs_lazy:
docs.append(doc)
print(docs[0].page_content[:100])
print(docs[0].metadata)
.. code-block:: python
{"sport": {"q1": {"question": "Which one is correct team name in
NBA?", "options": ["New York Bulls"
{'source': '/sample_quiz
.json', 'seq_num': 1}
"""
def __init__(
self,
file_path: Union[str, PathLike],
jq_schema: str,
content_key: Optional[str] = None,
is_content_key_jq_parsable: Optional[bool] = False,
metadata_func: Optional[Callable[[Dict, Dict], Dict]] = None,
text_content: bool = True,
json_lines: bool = False,
):
"""Initialize the JSONLoader.
Args:
file_path (Union[str, PathLike]): The path to the JSON or JSON Lines file.
jq_schema (str): The jq schema to use to extract the data or text from
the JSON.
content_key (str): The key to use to extract the content from
the JSON if the jq_schema results to a list of objects (dict).
If is_content_key_jq_parsable is True, this has to be a jq compatible
schema. If is_content_key_jq_parsable is False, this should be a simple
string key.
is_content_key_jq_parsable (bool): A flag to determine if
content_key is parsable by jq or not. If True, content_key is
treated as a jq schema and compiled accordingly. If False or if
content_key is None, content_key is used as a simple string.
Default is False.
metadata_func (Callable[Dict, Dict]): A function that takes in the JSON
object extracted by the jq_schema and the default metadata and returns
a dict of the updated metadata.
text_content (bool): Boolean flag to indicate whether the content is in
string format, default to True.
json_lines (bool): Boolean flag to indicate whether the input is in
JSON Lines format.
"""
try:
import jq
self.jq = jq
except ImportError:
raise ImportError(
"jq package not found, please install it with `pip install jq`"
)
self.file_path = Path(file_path).resolve()
self._jq_schema = jq.compile(jq_schema)
self._is_content_key_jq_parsable = is_content_key_jq_parsable
self._content_key = content_key
self._metadata_func = metadata_func
self._text_content = text_content
self._json_lines = json_lines
def lazy_load(self) -> Iterator[Document]:
"""Load and return documents from the JSON file."""
index = 0
if self._json_lines:
with self.file_path.open(encoding="utf-8-sig") as f:
for line in f:
line = line.strip()
if line:
for doc in self._parse(line, index):
yield doc
index += 1
else:
for doc in self._parse(
self.file_path.read_text(encoding="utf-8-sig"), index
):
yield doc
index += 1
def _parse(self, content: str, index: int) -> Iterator[Document]:
"""Convert given content to documents."""
data = self._jq_schema.input(json.loads(content))
# Perform some validation
# This is not a perfect validation, but it should catch most cases
# and prevent the user from getting a cryptic error later on.
if self._content_key is not None:
self._validate_content_key(data)
for i, sample in enumerate(data, index + 1):
text = self._get_text(sample=sample)
metadata = self._get_metadata(
sample=sample, source=str(self.file_path), seq_num=i
)
yield Document(page_content=text, metadata=metadata)
def _get_text(self, sample: Any) -> str:
"""Convert sample to string format"""
if self._content_key is not None:
if self._is_content_key_jq_parsable:
compiled_content_key = self.jq.compile(self._content_key)
content = compiled_content_key.input(sample).first()
else:
content = sample[self._content_key]
else:
content = sample
if self._text_content and not isinstance(content, str) and content is not None:
raise ValueError(
f"Expected page_content is string, got {type(content)} instead. \
Set `text_content=False` if the desired input for \
`page_content` is not a string"
)
# In case the text is None, set it to an empty string
elif isinstance(content, str):
return content
elif isinstance(content, (dict, list)):
return json.dumps(content) if content else ""
else:
return str(content) if content is not None else ""
def _get_metadata(
self, sample: Dict[str, Any], **additional_fields: Any
) -> Dict[str, Any]:
"""
Return a metadata dictionary base on the existence of metadata_func
:param sample: single data payload
:param additional_fields: key-word arguments to be added as metadata values
:return:
"""
if self._metadata_func is not None:
result = self._metadata_func(sample, additional_fields)
if not isinstance(result, dict):
raise ValueError(
f"Expected the metadata_func to return a dict but got \
`{type(result)}`"
)
return result
else:
return additional_fields
def _validate_content_key(self, data: Any) -> None:
"""Check if a content key is valid"""
sample = data.first()
if not isinstance(sample, dict):
raise ValueError(
f"Expected the jq schema to result in a list of objects (dict), \
so sample must be a dict but got `{type(sample)}`"
)
if (
not self._is_content_key_jq_parsable
and sample.get(self._content_key) is None
):
raise ValueError(
f"Expected the jq schema to result in a list of objects (dict) \
with the key `{self._content_key}`"
)
if (
self._is_content_key_jq_parsable
and self.jq.compile(self._content_key).input(sample).text() is None
):
raise ValueError(
f"Expected the jq schema to result in a list of objects (dict) \
with the key `{self._content_key}` which should be parsable by jq"
)
三.JSONLoader源码实现流程
现在我们有了jq_schema的印象,开始展开源码的分析。
1.分析参数
def __init__(
self,
file_path: Union[str, PathLike],
jq_schema: str,
content_key: Optional[str] = None,
is_content_key_jq_parsable: Optional[bool] = False,
metadata_func: Optional[Callable[[Dict, Dict], Dict]] = None,
text_content: bool = True,
json_lines: bool = False,
):
try:
import jq
self.jq = jq
except ImportError:
raise ImportError(
"jq is not installed. Please install it with 'pip install jq'."
)
self.file_path = Path(file_path).resolve()
# `.resolve()`: 这是一个关键步骤。它会解析路径中的任何 `.`(当前目录)或 `..`(父目录)符号,并返回一个绝对路径。这消除了路径的歧义,确保无论用户从哪个目录运行脚本,`self.file_path` 都指向一个确定的文件。这对于后续的元数据生成(如 `source`)非常重要。
self._jq_schema = jq.compile(jq_schema)
# `jq.compile()` 会将 `jq_schema` 字符串(如 `'.quiz | to_entries[]'`)解析并编译成一个内部的、可高效执行的查询对象。将这个编译后的对象存储在 `self._jq_schema` 中,意味着我们只在初始化时编译一次。在后续处理数据时,可以直接重用这个编译好的对象,而不需要在每次处理数据行时都重新解析和编译字符串,这对于处理大文件或大量数据时性能提升显著。
self._is_content_key_jq_parsable = is_content_key_jq_parsable
self._content_key = content_key
self._metadata_func = metadata_func
self._text_content = text_content
self._json_lines = json_lines
具体参数只挑几个来讲:
jq_schema:jq查询字符串,定义了如何从 JSON 中提取数据。例如'.quiz'会提取quiz对象下的所有内容。(这个说白了就是刚才讲的jq表达式)content_key: 当jq_schema提取的是一个字典列表时,用这个键来指定哪个字段的值作为Document的page_content。is_content_key_jq_parsable:简单来说,这个参数决定了content_key的“身份”,说明content_key本身是不是一个jq表达式。这个一般都是配合content_key来使用的(当时我看的时候觉得is_content_key_jq_parsable有点绕,等下讲到_parse函数的时候会举实例来说明。metadata_func: 一个可选的函数,用于自定义生成每个Document的元数据。json_lines: 布尔值,如果为True,则输入文件是 JSON Lines 格式(每行一个独立的 JSON 对象)。
2.lazy_load 方法:核心的懒加载逻辑
def lazy_load(self) -> Iterator[Document]:
"""Load and return documents from the JSON file."""
index = 0
# `index` 从 0 开始,每生成一个 `Document`,它就会递增。这个 `index` 会被传递给 `_parse` 方法,用于计算最终的 `seq_num`。
if self._json_lines:
# ... 处理 JSON Lines ...
else:
# ... 处理标准 JSON ...
然后用条件分支处理不同文件格式:
if self._json_lines:
with self.file_path.open(encoding="utf-8-sig") as f:
# `encoding="utf-8-sig"` 是一个很好的实践,它能正确处理 UTF-8 文件开头可能存在的 BOM(字节顺序标记)。
for line in f:
line = line.strip()
if line:
for doc in self._parse(line, index):
yield doc
index += 1
else:
for doc in self._parse(
self.file_path.read_text(encoding="utf-8-sig"), index
):
yield doc
index += 1
如果是JSON Lines的模式,就会开始逐行迭代文件(内存高效,不会把整个json文件一次性读出),去除每行首尾空白字符,调用 `_parse` 方法处理单行 JSON 字符串,将生成的 `Document` 对象“产出”给调用者,最后索引加一。
如果是标准json模式,一次性读取整个文件内容到一个字符串中,将整个 JSON 字符串传递给 `_parse` 方法进行处理,最后索引加一。
3._parse 方法:从原始数据到 Document 的转换
def _parse(self, content: str, index: int) -> Iterator[Document]:
data = self._jq_schema.input(json.loads(content))
# 它将 Python 对象作为输入,执行在 `__init__` 中预编译好的 `jq_schema` 查询。查询结果 `data` 是一个特殊的 `jq` 迭代器对象,包含了所有匹配查询的项。
# ... validation ...(这个地方先填个坑)
for i, sample in enumerate(data, index + 1):
# ...
yield Document(page_content=text, metadata=metadata)
这个地方我们需要生成一个Document:
for i, sample in enumerate(data, index + 1):
text = self._get_text(sample=sample)
metadata = self._get_metadata(
sample=sample, source=str(self.file_path), seq_num=i
)
yield Document(page_content=text, metadata=metadata)
那么这时候我们需要获取text和metadata,所以来设置两个函数来获取。
4._get_text 方法:提取和格式化页面内容
def _get_text(self, sample: Any) -> str:
if self._content_key is not None:
# ... logic with content_key ...
else:
content = sample
# ... formatting logic ...
首先要决定内容来源:
if self._content_key is not None:
# 如 `content_key` 存在**:
# `is_content_key_jq_parsable` 为 `True`**: `content_key` 本身就是一个 `jq` 表达式(如 `'.options[0]'`)。代码会即时编译这个表达式,并在 `sample` 上执行它,获取第一个匹配结果 `.first()`。这提供了极大的灵活性。
# `is_content_key_jq_parsable` 为 `False`**: `content_key` 只是一个普通的字典键(如 `'question'`)。代码直接通过 `sample[self._content_key]` 来获取值。这是最常见和最高效的方式。
if self._is_content_key_jq_parsable:
compiled_content_key = self.jq.compile(self._content_key)
content = compiled_content_key.input(sample).first()
else:
content = sample[self._content_key]
上面这段代码初步看会有点懵,现在来举个例子:
# 这是一个名字叫sample_quiz.json的json文件
{ "quiz": { "sport": { "q1": { "question": "Which one is correct team name in NBA?", "options": [ "New York Bulls", "Los Angeles Kings", "Golden State Warriros", "Huston Rocket" ], "answer": "Huston Rocket" }, "q2": { "question": "Which country won the FIFA World Cup in 2018?", "options": ["France", "Croatia", "Russia", "Brazil"], "answer": "France" } }, "maths": { "q1": { "question": "What is 5 + 7?", "options": ["10", "11", "12", "13"], "answer": "12" }, "q2": { "question": "What is the square root of 81?", "options": ["7", "8", "9", "10"], "answer": "9" } } } }情况一:
is_content_key_jq_parsable = False(默认用法)# 提取问题 loader = JSONLoader( file_path='./sample_quiz.json', jq_schema='.quiz | to_entries[] | .value | to_entries[] | .value', content_key='question', # 这是一个简单的字符串键 # is_content_key_jq_parsable 默认为 False ) doc = loader.load()[0] print(doc.page_content) # 输出:Which one is correct team name in NBA? # _get_text 方法看到 is_content_key_jq_parsable 是 False。 # 它执行 sample['question'],成功获取到值。tips:这里有人可能对.quiz | to_entries[] | .value | to_entries[] | .value有疑问
讲一下.quiz | to_entries[]的流程:
(.quiz)
- 输入: 整个 JSON
- 输出:
{"sport": {...}, "maths": {...}}- 作用:拿到
quiz对象。
| to_entries[](作用:将 `sport` 和 `maths` 两个部分分离出来,方便后续处理)
- 输入:
{"sport": {...}, "maths": {...}}to_entries将对象转换为键值对数组:[{"key": "sport", "value": {...}}, {"key": "maths", "value": {...}}][]展开这个数组。输出:
{"key": "sport", "value": {"q1": {...}, "q2": {...}}}
{"key": "maths", "value": {"q1": {...}, "q2": {...}}}后面的过程类似。
情况二:
is_content_key_jq_parsable = True(高级用法)如果我们想提取第一个选项呢?
content_key应该是'options',但这会返回整个列表["New York Bulls", ...]。我们只想要第一个"New York Bulls"。用简单的键content_key='options[0]'是行不通的,因为 Python 字典没有叫'options[0]'的键。它会报KeyError。这时,
is_content_key_jq_parsable = True就派上用场了。当我们需要的数据不是顶层键,或者需要进一步处理(如取数组元素、合并字段等)时,就要用它。
用一个表格来说明is_content_key_jq_parsable是 模式切换开关# 提取第一个选项 loader = JSONLoader( file_path='./sample_quiz.json', jq_schema='.quiz | to_entries[] | .value | to_entries[] | .value', content_key='.options[0]', # 这是一个 jq 表达式! is_content_key_jq_parsable = True # 必须设置为 True! ) doc = loader.load()[0] print(doc.page_content) # 输出:New York Bulls # _get_text 方法看到 is_content_key_jq_parsable 是 True。 # 它把 content_key 的值 '.options[0]' 当作一个 jq 脚本。 # 它执行 jq.compile('.options[0]'),然后在 sample 对象上运行这个脚本。 # jq 成功地从 options 数组中取出了索引为 0 的元素。
参数值 content_key的身份能力 例子 类比 False字典键 只能取顶层键的值 'question'你有一个带标签的抽屉柜,你只能通过标签( 'question')直接拉开整个抽屉。Truejq查询语句能取任意路径、数组元素、计算值 '.options[0]'你不仅有抽屉柜,还有一把万能钥匙( jq)。你可以用它打开抽屉,再从抽屉里的盒子(options)中拿出第一个物品([0])。
我们再回到的_get_text方法设计中,在决定内容来源后,进行内容格式化与类型检查:
if self._text_content and not isinstance(content, str) and content is not None:
raise ValueError(...)
# 这是一个严格的类型检查。如果用户明确指定 `text_content=True`(默认值),那么如果提取出的 `content` 不是字符串(且不是 `None`),就会抛出 `ValueError`。这可以防止用户意外地将非文本数据传递给下游的文本处理模型。
elif isinstance(content, str):
return content
# 如果 `content` 已经是字符串,直接返回。
elif isinstance(content, (dict, list)):
return json.dumps(content) if content else ""
# 如果 `content` 是字典或列表,使用 `json.dumps(content)` 将其序列化为 JSON 格式的字符串。如果内容为空(`{}` 或 `[]`),则返回空字符串 `""`。
else:
return str(content) if content is not None else ""
# 兜底逻辑。对于其他任何类型(如数字、布尔值),使用 `str(content)` 转换为字符串。如果 `content` 是 `None`,也返回空字符串 `""`。这确保了方法的健壮性,总能返回一个有效的字符串。
5._get_metadata 方法:生成元数据
def _get_metadata(
self, sample: Dict[str, Any], **additional_fields: Any
) -> Dict[str, Any]:
if self._metadata_func is not None:
# 判断用户是否提供了自定义的元数据处理逻辑。
result = self._metadata_func(sample, additional_fields)
# ... validation ...
return result
else:
return additional_fields # 直接返回包含 `source`(文件路径)和 `seq_num`(文档序号)的默认字典。
这里就涉及到metadata_fun的参数,可以自定义元数据,举个例子:
依然是用刚才的json文件# 在 main.py 中添加以下代码 # --- 场景3: 使用 metadata_func 自定义元数据 --- print("--- metadata_func 自定义元数据 ---") def simple_metadata_func(record: dict, metadata: dict) -> dict: """从 record 中提取 answer 添加到 metadata""" metadata["answer"] = record.get("answer") return metadata # 这个场景的 jq_schema 需要返回包含 question 和 answer 的完整对象 loader_custom_meta = JSONLoader( file_path=file_path, jq_schema='.quiz | to_entries[] | .value | to_entries[] | .value', content_key='question', metadata_func=simple_metadata_func, text_content=True ) docs_custom_meta = loader_custom_meta.load() print(f"加载了 {len(docs_custom_meta)} 个文档。") for doc in docs_custom_meta: print(f"内容: {doc.page_content}") print(f"元数据: {doc.metadata}") print("-" * 10) print("-" * 20) """输出内容 --- 使用 metadata_func 自定义元数据 --- 加载了 4 个文档。 内容: Which one is correct team name in NBA? 元数据: {'source': '/path/to/your/project/json_loader_project/sample_quiz.json', 'seq_num': 1, 'answer': 'Huston Rocket'} ---------- 内容: Which country won the FIFA World Cup in 2018? 元数据: {'source': '/path/to/your/project/json_loader_project/sample_quiz.json', 'seq_num': 2, 'answer': 'France'} ---------- 内容: What is 5 + 7? 元数据: {'source': '/path/to/your/project/json_loader_project/sample_quiz.json', 'seq_num': 3, 'answer': '12'} ---------- 内容: What is the square root of 81? 元数据: {'source': '/path/to/your/project/json_loader_project/sample_quiz.json', 'seq_num': 4, 'answer': '9'} ---------- -------------------- """
6._validate_content_key 方法:早期验证逻辑
这就是刚才_parse中填的坑:
def _validate_content_key(self, data: Any) -> None:
sample = data.first()
# `data` 是一个 `jq` 迭代器。`.first()` 方法获取第一个元素。这是一种高效的验证方式,因为它不需要遍历所有数据。
if not isinstance(sample, dict):
raise ValueError(...)
# 如果 `content_key` 存在,意味着用户想从字典中按键取值。如果 `jq` 查询结果的第一个元素不是字典,那么这个假设就不成立,立即报错。
if (
not self._is_content_key_jq_parsable
and sample.get(self._content_key) is None
):
raise ValueError(...)
if (
self._is_content_key_jq_parsable
and self.jq.compile(self._content_key).input(sample).text() is None
):
raise ValueError(...)
# 确保指定的 content_key 在样本字典中是可获取的。
这篇是LangChain源码分析的第一次分析(主要感觉他的很多处理健壮性很强,自己比较难以想到)。可能会比较冗余,也可能一些细节地方没有解释明白。如果对这种类型文章感兴趣的话,可以给我提一点建议,我一定会认真思考,做出改变,谢谢!
更多推荐



所有评论(0)