一.jq_schema(前置学习)

1.jq的初印象

要想去更好地理解LangCahin中JSONLoader的原理,我们得首先介绍jq。

在我看来,jq类似于JSON数据的sedawk,是一个轻量级的、灵活的命令行 JSON 处理器。

import jq
# python中导包

2.jq的核心概念

jq 的表达式(我们称之为 jq_schema)就像一个管道,数据从左边流入,经过一系列处理,最终从右边流出。

a.基本过滤器:. (点)

# 输入
{"name": "John", "age": 30}

# jq 表达式
.

# 输出
{"name": "John", "age": 30}

b.对象值访问:.key

# 输入
{"name": "John", "age": 30}

# jq 表达式
.name

# 输出
"John"

c.数组元素访问:.[] 和 .[index]

# 输入
{"users": ["Alice", "Bob", "Charlie"]}

# jq 表达式 1: 展开数组
.users[]

# 输出 1
"Alice"
"Bob"
"Charlie"

# jq 表达式 2: 获取第二个元素
.users[1]

# 输出 2
"Bob"

d.jq操作符:|

这个地方很类似LangChain中管道符号|,它将前一个命令的输出作为后一个命令的输入。

比如以下的json格式
{
  "quiz": {
    "sport": {
      "q1": { "question": "Which one is correct team name in NBA?" }
    }
  }
}

.quiz | .sport | .q1 | .question

# 逻辑流程:
# 1. .quiz -> 获取 quiz 对象
# 2. | .sport -> 将 quiz 对象传入,获取 sport 对象
# 3. | .q1 -> 将 sport 对象传入,获取 q1 对象
# 4. | .question -> 将 q1 对象传入,获取 question 的值

# 输出
"Which one is correct team name in NBA?"

如果对jq模式感兴趣的话,可以去以下两个地方详细学习。

二.JSONLoader源码

建议先自己看一下源码,如果能掌握就可以跳过这篇文章了,不明白的地方再跳转到相应方法实现中看一下我的注释和说明。

import json
from os import PathLike
from pathlib import Path
from typing import Any, Callable, Dict, Iterator, Optional, Union

from langchain_core.documents import Document

from langchain_community.document_loaders.base import BaseLoader


class JSONLoader(BaseLoader):
    """
    Load a `JSON` file using a `jq` schema.

    Setup:
        .. code-block:: bash

            pip install -U jq

    Instantiate:
        .. code-block:: python

            from langchain_community.document_loaders import JSONLoader
            import json
            from pathlib import Path

            file_path='./sample_quiz.json'
            data = json.loads(Path(file_path).read_text())
            loader = JSONLoader(
                     file_path=file_path,
                     jq_schema='.quiz',
                     text_content=False)

    Load:
        .. code-block:: python

            docs = loader.load()
            print(docs[0].page_content[:100])
            print(docs[0].metadata)

        .. code-block:: python

            {"sport": {"q1": {"question": "Which one is correct team name in
            NBA?", "options": ["New York Bulls"
            {'source': '/sample_quiz
            .json', 'seq_num': 1}

    Async load:
        .. code-block:: python

            docs = await loader.aload()
            print(docs[0].page_content[:100])
            print(docs[0].metadata)

        .. code-block:: python

            {"sport": {"q1": {"question": "Which one is correct team name in
            NBA?", "options": ["New York Bulls"
            {'source': '/sample_quizg
            .json', 'seq_num': 1}

    Lazy load:
        .. code-block:: python

            docs = []
            docs_lazy = loader.lazy_load()

            # async variant:
            # docs_lazy = await loader.alazy_load()

            for doc in docs_lazy:
                docs.append(doc)
            print(docs[0].page_content[:100])
            print(docs[0].metadata)

        .. code-block:: python

            {"sport": {"q1": {"question": "Which one is correct team name in
            NBA?", "options": ["New York Bulls"
            {'source': '/sample_quiz
            .json', 'seq_num': 1}
    """

    def __init__(
        self,
        file_path: Union[str, PathLike],
        jq_schema: str,
        content_key: Optional[str] = None,
        is_content_key_jq_parsable: Optional[bool] = False,
        metadata_func: Optional[Callable[[Dict, Dict], Dict]] = None,
        text_content: bool = True,
        json_lines: bool = False,
    ):
        """Initialize the JSONLoader.

        Args:
            file_path (Union[str, PathLike]): The path to the JSON or JSON Lines file.
            jq_schema (str): The jq schema to use to extract the data or text from
                the JSON.
            content_key (str): The key to use to extract the content from
                the JSON if the jq_schema results to a list of objects (dict).
                If is_content_key_jq_parsable is True, this has to be a jq compatible
                schema. If is_content_key_jq_parsable is False, this should be a simple
                string key.
            is_content_key_jq_parsable (bool): A flag to determine if
                content_key is parsable by jq or not. If True, content_key is
                treated as a jq schema and compiled accordingly. If False or if
                content_key is None, content_key is used as a simple string.
                Default is False.
            metadata_func (Callable[Dict, Dict]): A function that takes in the JSON
                object extracted by the jq_schema and the default metadata and returns
                a dict of the updated metadata.
            text_content (bool): Boolean flag to indicate whether the content is in
                string format, default to True.
            json_lines (bool): Boolean flag to indicate whether the input is in
                JSON Lines format.
        """
        try:
            import jq

            self.jq = jq
        except ImportError:
            raise ImportError(
                "jq package not found, please install it with `pip install jq`"
            )

        self.file_path = Path(file_path).resolve()
        self._jq_schema = jq.compile(jq_schema)
        self._is_content_key_jq_parsable = is_content_key_jq_parsable
        self._content_key = content_key
        self._metadata_func = metadata_func
        self._text_content = text_content
        self._json_lines = json_lines

    def lazy_load(self) -> Iterator[Document]:
        """Load and return documents from the JSON file."""
        index = 0
        if self._json_lines:
            with self.file_path.open(encoding="utf-8-sig") as f:
                for line in f:
                    line = line.strip()
                    if line:
                        for doc in self._parse(line, index):
                            yield doc
                            index += 1
        else:
            for doc in self._parse(
                self.file_path.read_text(encoding="utf-8-sig"), index
            ):
                yield doc
                index += 1

    def _parse(self, content: str, index: int) -> Iterator[Document]:
        """Convert given content to documents."""
        data = self._jq_schema.input(json.loads(content))

        # Perform some validation
        # This is not a perfect validation, but it should catch most cases
        # and prevent the user from getting a cryptic error later on.
        if self._content_key is not None:
            self._validate_content_key(data)

        for i, sample in enumerate(data, index + 1):
            text = self._get_text(sample=sample)
            metadata = self._get_metadata(
                sample=sample, source=str(self.file_path), seq_num=i
            )
            yield Document(page_content=text, metadata=metadata)

    def _get_text(self, sample: Any) -> str:
        """Convert sample to string format"""
        if self._content_key is not None:
            if self._is_content_key_jq_parsable:
                compiled_content_key = self.jq.compile(self._content_key)
                content = compiled_content_key.input(sample).first()
            else:
                content = sample[self._content_key]
        else:
            content = sample

        if self._text_content and not isinstance(content, str) and content is not None:
            raise ValueError(
                f"Expected page_content is string, got {type(content)} instead. \
                    Set `text_content=False` if the desired input for \
                    `page_content` is not a string"
            )

        # In case the text is None, set it to an empty string
        elif isinstance(content, str):
            return content
        elif isinstance(content, (dict, list)):
            return json.dumps(content) if content else ""
        else:
            return str(content) if content is not None else ""

    def _get_metadata(
        self, sample: Dict[str, Any], **additional_fields: Any
    ) -> Dict[str, Any]:
        """
        Return a metadata dictionary base on the existence of metadata_func
        :param sample: single data payload
        :param additional_fields: key-word arguments to be added as metadata values
        :return:
        """
        if self._metadata_func is not None:
            result = self._metadata_func(sample, additional_fields)
            if not isinstance(result, dict):
                raise ValueError(
                    f"Expected the metadata_func to return a dict but got \
                                `{type(result)}`"
                )
            return result
        else:
            return additional_fields

    def _validate_content_key(self, data: Any) -> None:
        """Check if a content key is valid"""

        sample = data.first()
        if not isinstance(sample, dict):
            raise ValueError(
                f"Expected the jq schema to result in a list of objects (dict), \
                    so sample must be a dict but got `{type(sample)}`"
            )

        if (
            not self._is_content_key_jq_parsable
            and sample.get(self._content_key) is None
        ):
            raise ValueError(
                f"Expected the jq schema to result in a list of objects (dict) \
                    with the key `{self._content_key}`"
            )
        if (
            self._is_content_key_jq_parsable
            and self.jq.compile(self._content_key).input(sample).text() is None
        ):
            raise ValueError(
                f"Expected the jq schema to result in a list of objects (dict) \
                    with the key `{self._content_key}` which should be parsable by jq"
            )

三.JSONLoader源码实现流程

现在我们有了jq_schema的印象,开始展开源码的分析。

1.分析参数

def __init__(
    self,
    file_path: Union[str, PathLike],
    jq_schema: str,
    content_key: Optional[str] = None,
    is_content_key_jq_parsable: Optional[bool] = False,
    metadata_func: Optional[Callable[[Dict, Dict], Dict]] = None,
    text_content: bool = True,
    json_lines: bool = False,
):
    try:
        import jq
        self.jq = jq
    except ImportError:
        raise ImportError(
            "jq is not installed. Please install it with 'pip install jq'."
        )

    self.file_path = Path(file_path).resolve()
    # `.resolve()`: 这是一个关键步骤。它会解析路径中的任何 `.`(当前目录)或 `..`(父目录)符号,并返回一个绝对路径。这消除了路径的歧义,确保无论用户从哪个目录运行脚本,`self.file_path` 都指向一个确定的文件。这对于后续的元数据生成(如 `source`)非常重要。
    self._jq_schema = jq.compile(jq_schema)
    # `jq.compile()` 会将 `jq_schema` 字符串(如 `'.quiz | to_entries[]'`)解析并编译成一个内部的、可高效执行的查询对象。将这个编译后的对象存储在 `self._jq_schema` 中,意味着我们只在初始化时编译一次。在后续处理数据时,可以直接重用这个编译好的对象,而不需要在每次处理数据行时都重新解析和编译字符串,这对于处理大文件或大量数据时性能提升显著。
    self._is_content_key_jq_parsable = is_content_key_jq_parsable
    self._content_key = content_key
    self._metadata_func = metadata_func
    self._text_content = text_content
    self._json_lines = json_lines

具体参数只挑几个来讲:

  • jq_schemajq 查询字符串,定义了如何从 JSON 中提取数据。例如 '.quiz' 会提取 quiz 对象下的所有内容。(这个说白了就是刚才讲的jq表达式)
  • content_key: 当 jq_schema 提取的是一个字典列表时,用这个键来指定哪个字段的值作为 Document 的 page_content
  • is_content_key_jq_parsable:简单来说,这个参数决定了 content_key 的“身份”,说明 content_key 本身是不是一个 jq 表达式。这个一般都是配合content_key来使用的当时我看的时候觉得is_content_key_jq_parsable有点绕,等下讲到_parse函数的时候会举实例来说明。
  • metadata_func: 一个可选的函数,用于自定义生成每个 Document 的元数据。
  • json_lines: 布尔值,如果为 True,则输入文件是 JSON Lines 格式(每行一个独立的 JSON 对象)。

2.lazy_load 方法:核心的懒加载逻辑

def lazy_load(self) -> Iterator[Document]:
    """Load and return documents from the JSON file."""
    index = 0
    # `index` 从 0 开始,每生成一个 `Document`,它就会递增。这个 `index` 会被传递给 `_parse` 方法,用于计算最终的 `seq_num`。

    if self._json_lines:
        # ... 处理 JSON Lines ...
    else:
        # ... 处理标准 JSON ...

然后用条件分支处理不同文件格式:

    if self._json_lines:
        
        with self.file_path.open(encoding="utf-8-sig") as f:
            # `encoding="utf-8-sig"` 是一个很好的实践,它能正确处理 UTF-8 文件开头可能存在的 BOM(字节顺序标记)。
            for line in f:
                line = line.strip()
                if line:
                    for doc in self._parse(line, index):
                        yield doc
                        index += 1
    else:
        for doc in self._parse(
            self.file_path.read_text(encoding="utf-8-sig"), index
        ):
            yield doc
            index += 1
    

如果是JSON Lines的模式,就会开始逐行迭代文件(内存高效,不会把整个json文件一次性读出),去除每行首尾空白字符,调用 `_parse` 方法处理单行 JSON 字符串,将生成的 `Document` 对象“产出”给调用者,最后索引加一。

如果是标准json模式,一次性读取整个文件内容到一个字符串中,将整个 JSON 字符串传递给 `_parse` 方法进行处理,最后索引加一。

3._parse 方法:从原始数据到 Document 的转换

def _parse(self, content: str, index: int) -> Iterator[Document]:
    data = self._jq_schema.input(json.loads(content))
    # 它将 Python 对象作为输入,执行在 `__init__` 中预编译好的 `jq_schema` 查询。查询结果 `data` 是一个特殊的 `jq` 迭代器对象,包含了所有匹配查询的项。
    # ... validation ...(这个地方先填个坑)
    for i, sample in enumerate(data, index + 1):
        # ...
        yield Document(page_content=text, metadata=metadata)

这个地方我们需要生成一个Document:

    for i, sample in enumerate(data, index + 1):
        text = self._get_text(sample=sample)
        metadata = self._get_metadata(
            sample=sample, source=str(self.file_path), seq_num=i
        )
        yield Document(page_content=text, metadata=metadata)
    

那么这时候我们需要获取text和metadata,所以来设置两个函数来获取。

4._get_text 方法:提取和格式化页面内容

def _get_text(self, sample: Any) -> str:
    if self._content_key is not None:
        # ... logic with content_key ...
    else:
        content = sample
    # ... formatting logic ...

首先要决定内容来源:

    if self._content_key is not None:
        # 如 `content_key` 存在**:
        # `is_content_key_jq_parsable` 为 `True`**: `content_key` 本身就是一个 `jq` 表达式(如 `'.options[0]'`)。代码会即时编译这个表达式,并在 `sample` 上执行它,获取第一个匹配结果 `.first()`。这提供了极大的灵活性。
        # `is_content_key_jq_parsable` 为 `False`**: `content_key` 只是一个普通的字典键(如 `'question'`)。代码直接通过 `sample[self._content_key]` 来获取值。这是最常见和最高效的方式。
        if self._is_content_key_jq_parsable:
            compiled_content_key = self.jq.compile(self._content_key)
            content = compiled_content_key.input(sample).first()
        else:
            content = sample[self._content_key]

上面这段代码初步看会有点懵,现在来举个例子:

# 这是一个名字叫sample_quiz.json的json文件

{
  "quiz": {
    "sport": {
      "q1": {
        "question": "Which one is correct team name in NBA?",
        "options": [
          "New York Bulls",
          "Los Angeles Kings",
          "Golden State Warriros",
          "Huston Rocket"
        ],
        "answer": "Huston Rocket"
      },
      "q2": {
        "question": "Which country won the FIFA World Cup in 2018?",
        "options": ["France", "Croatia", "Russia", "Brazil"],
        "answer": "France"
      }
    },
    "maths": {
      "q1": {
        "question": "What is 5 + 7?",
        "options": ["10", "11", "12", "13"],
        "answer": "12"
      },
      "q2": {
        "question": "What is the square root of 81?",
        "options": ["7", "8", "9", "10"],
        "answer": "9"
      }
    }
  }
}

情况一:is_content_key_jq_parsable = False (默认用法)

# 提取问题
loader = JSONLoader(
    file_path='./sample_quiz.json',
    jq_schema='.quiz | to_entries[] | .value | to_entries[] | .value',
    content_key='question',  # 这是一个简单的字符串键
    # is_content_key_jq_parsable 默认为 False
)

doc = loader.load()[0]
print(doc.page_content)

# 输出:Which one is correct team name in NBA?
# _get_text 方法看到 is_content_key_jq_parsable 是 False。
# 它执行 sample['question'],成功获取到值。

tips:这里有人可能对.quiz | to_entries[] | .value | to_entries[] | .value有疑问

讲一下.quiz | to_entries[]的流程:

  1. (.quiz)

    • 输入: 整个 JSON
    • 输出: {"sport": {...}, "maths": {...}}
    • 作用:拿到 quiz 对象。
  2. | to_entries[](作用:将 `sport` 和 `maths` 两个部分分离出来,方便后续处理

    • 输入: {"sport": {...}, "maths": {...}}
    • to_entries 将对象转换为键值对数组:[{"key": "sport", "value": {...}}, {"key": "maths", "value": {...}}]
    • [] 展开这个数组。

输出:

        {"key": "sport", "value": {"q1": {...}, "q2": {...}}}
        {"key": "maths", "value": {"q1": {...}, "q2": {...}}}

后面的过程类似。
        

情况二:is_content_key_jq_parsable = True (高级用法)

如果我们想提取第一个选项呢?
content_key 应该是 'options',但这会返回整个列表 ["New York Bulls", ...]。我们只想要第一个 "New York Bulls"。用简单的键 content_key='options[0]' 是行不通的,因为 Python 字典没有叫 'options[0]' 的键。它会报 KeyError

这时,is_content_key_jq_parsable = True 就派上用场了。

当我们需要的数据不是顶层键,或者需要进一步处理(如取数组元素、合并字段等)时,就要用它。

# 提取第一个选项
loader = JSONLoader(
    file_path='./sample_quiz.json',
    jq_schema='.quiz | to_entries[] | .value | to_entries[] | .value',
    content_key='.options[0]',  # 这是一个 jq 表达式!
    is_content_key_jq_parsable = True # 必须设置为 True!
)

doc = loader.load()[0]
print(doc.page_content)

# 输出:New York Bulls
# _get_text 方法看到 is_content_key_jq_parsable 是 True。
# 它把 content_key 的值 '.options[0]' 当作一个 jq 脚本。
# 它执行 jq.compile('.options[0]'),然后在 sample 对象上运行这个脚本。
# jq 成功地从 options 数组中取出了索引为 0 的元素。


用一个表格来说明is_content_key_jq_parsable是 模式切换开关
参数值 content_key 的身份 能力 例子 类比
False 字典键 只能取顶层键的值 'question' 你有一个带标签的抽屉柜,你只能通过标签('question')直接拉开整个抽屉。
True jq 查询语句 能取任意路径、数组元素、计算值 '.options[0]' 你不仅有抽屉柜,还有一把万能钥匙(jq)。你可以用它打开抽屉,再从抽屉里的盒子(options)中拿出第一个物品([0])。

我们再回到的_get_text方法设计中,在决定内容来源后,进行内容格式化与类型检查:

    if self._text_content and not isinstance(content, str) and content is not None:
        raise ValueError(...)
# 这是一个严格的类型检查。如果用户明确指定 `text_content=True`(默认值),那么如果提取出的 `content` 不是字符串(且不是 `None`),就会抛出 `ValueError`。这可以防止用户意外地将非文本数据传递给下游的文本处理模型。

    elif isinstance(content, str):
        return content
# 如果 `content` 已经是字符串,直接返回。

    elif isinstance(content, (dict, list)):
        return json.dumps(content) if content else ""
# 如果 `content` 是字典或列表,使用 `json.dumps(content)` 将其序列化为 JSON 格式的字符串。如果内容为空(`{}` 或 `[]`),则返回空字符串 `""`。

    else:
        return str(content) if content is not None else ""
# 兜底逻辑。对于其他任何类型(如数字、布尔值),使用 `str(content)` 转换为字符串。如果 `content` 是 `None`,也返回空字符串 `""`。这确保了方法的健壮性,总能返回一个有效的字符串。
    

5._get_metadata 方法:生成元数据

def _get_metadata(
    self, sample: Dict[str, Any], **additional_fields: Any
) -> Dict[str, Any]:
    if self._metadata_func is not None:
    # 判断用户是否提供了自定义的元数据处理逻辑。
        result = self._metadata_func(sample, additional_fields)
        # ... validation ...
        return result
    else:
        return additional_fields # 直接返回包含 `source`(文件路径)和 `seq_num`(文档序号)的默认字典。

这里就涉及到metadata_fun的参数,可以自定义元数据,举个例子:

依然是用刚才的json文件

# 在 main.py 中添加以下代码

# --- 场景3: 使用 metadata_func 自定义元数据 ---
print("--- metadata_func 自定义元数据 ---")


def simple_metadata_func(record: dict, metadata: dict) -> dict:
    """从 record 中提取 answer 添加到 metadata"""
    metadata["answer"] = record.get("answer")
    return metadata

# 这个场景的 jq_schema 需要返回包含 question 和 answer 的完整对象
loader_custom_meta = JSONLoader(
    file_path=file_path,
    jq_schema='.quiz | to_entries[] | .value | to_entries[] | .value',
    content_key='question',
    metadata_func=simple_metadata_func,
    text_content=True
)

docs_custom_meta = loader_custom_meta.load()
print(f"加载了 {len(docs_custom_meta)} 个文档。")
for doc in docs_custom_meta:
    print(f"内容: {doc.page_content}")
    print(f"元数据: {doc.metadata}")
    print("-" * 10)
print("-" * 20)


"""输出内容
 --- 使用 metadata_func 自定义元数据 ---
加载了 4 个文档。
内容: Which one is correct team name in NBA?
元数据: {'source': '/path/to/your/project/json_loader_project/sample_quiz.json', 'seq_num': 1, 'answer': 'Huston Rocket'}
----------
内容: Which country won the FIFA World Cup in 2018?
元数据: {'source': '/path/to/your/project/json_loader_project/sample_quiz.json', 'seq_num': 2, 'answer': 'France'}
----------
内容: What is 5 + 7?
元数据: {'source': '/path/to/your/project/json_loader_project/sample_quiz.json', 'seq_num': 3, 'answer': '12'}
----------
内容: What is the square root of 81?
元数据: {'source': '/path/to/your/project/json_loader_project/sample_quiz.json', 'seq_num': 4, 'answer': '9'}
----------
--------------------
"""

6._validate_content_key 方法:早期验证逻辑

这就是刚才_parse中填的坑:

def _validate_content_key(self, data: Any) -> None:
    sample = data.first()
    # `data` 是一个 `jq` 迭代器。`.first()` 方法获取第一个元素。这是一种高效的验证方式,因为它不需要遍历所有数据。
    if not isinstance(sample, dict):
        raise ValueError(...)
    # 如果 `content_key` 存在,意味着用户想从字典中按键取值。如果 `jq` 查询结果的第一个元素不是字典,那么这个假设就不成立,立即报错。
    if (
        not self._is_content_key_jq_parsable
        and sample.get(self._content_key) is None
    ):
        raise ValueError(...)
    if (
        self._is_content_key_jq_parsable
        and self.jq.compile(self._content_key).input(sample).text() is None
    ):
        raise ValueError(...)
    # 确保指定的 content_key 在样本字典中是可获取的。

这篇是LangChain源码分析的第一次分析(主要感觉他的很多处理健壮性很强,自己比较难以想到)。可能会比较冗余,也可能一些细节地方没有解释明白。如果对这种类型文章感兴趣的话,可以给我提一点建议,我一定会认真思考,做出改变,谢谢!

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐