一、概述

Apache Paimon 是一个流批一体的数据湖存储系统,为了提升查询性能,它提供了强大的文件索引机制。文件索引(File Index)是 Paimon 在读取端通过建立索引来过滤数据文件的核心特性,能够显著加速查询执行。本文将深入分析 Paimon 文件索引的实现原理,特别是以 Bitmap 索引为例,从索引文件格式、索引创建流程、查询执行过程三个维度进行详细解读。 1

二、文件索引整体架构

2.1 文件索引的基本概念

Paimon 的文件索引是一种外部索引文件,为每个数据文件创建对应的索引文件。当索引文件较小时,它会直接存储在 manifest 中;当索引文件较大时,则存储在数据文件所在的目录中。每个数据文件对应一个索引文件,索引文件有独立的文件定义,可以包含多个列的不同类型索引。 2

2.2 支持的索引类型

Paimon 目前支持以下几种文件索引类型:

  1. BloomFilter 索引:适用于点查询场景,通过概率数据结构快速判断某个值是否可能存在
  2. Bitmap 索引:适用于低基数列的精确过滤,占用空间较大但准确度高
  3. Range Bitmap 索引:支持范围查询和高基数场景,使用位切片索引技术
  4. BSI 索引(已废弃):数值范围索引,已被 Range Bitmap 替代 3

2.3 文件索引的整体格式

文件索引采用统一的文件格式,包含 HEAD 和 BODY 两部分。HEAD 部分存储所有列和索引的元数据信息(列名、索引类型、偏移量、长度等),BODY 部分存储实际的索引数据。 4

文件索引格式的关键特点:

  • Magic Number1493475289347502L,用于识别文件类型
  • Version:当前版本为 V_1
  • Head Length:头部长度,用于快速定位 BODY 部分
  • Column Number:包含的列数量
  • Index Metadata:每个列的索引元数据,包括索引名称、起始位置和长度
  • 字节序:所有整数和长整数使用 BIG_ENDIAN 字节序 5

三、Bitmap 索引详解

3.1 Bitmap 索引的适用场景

Bitmap 索引特别适合以下场景:

  • 低基数列(如性别、状态、类别等)
  • 精确匹配查询(=INNOT IN
  • NULL 值查询(IS NULLIS NOT NULL
  • 需要进行 AND/OR 组合查询的场景

Bitmap 索引支持的数据类型包括:TinyIntType、SmallIntType、IntType、BigIntType、DateType、TimeType、LocalZonedTimestampType、TimestampType、CharType、VarCharType、StringType、BooleanType。 6

3.2 Bitmap 索引文件格式

Paimon 的 Bitmap 索引有两个版本:V1(遗留版本)和 V2(当前推荐版本)。

3.2.1 Bitmap 索引 V1 格式

V1 格式较为简单,将所有的值到偏移量的映射都存储在头部: 7

V1 格式的主要组成部分:

  • version:1 字节,值为 1
  • row count:4 字节整数,数据文件的总行数
  • non-null value bitmap number:4 字节整数,非空值的 bitmap 数量
  • has null value:1 字节,标识是否有 NULL 值
  • null value offset:4 字节整数(如果有 NULL 值),NULL bitmap 的偏移量
  • value-offset 映射:每个不同的值及其 bitmap 的偏移量
  • serialized bitmaps:序列化的 RoaringBitmap32 数据
3.2.2 Bitmap 索引 V2 格式(推荐)

当 Bitmap 索引列的基数较高时,使用 V1 格式会导致读取整个字典的时间较长。V2 格式通过在字典上创建二级索引来解决这个问题,提高了高基数列上的谓词命中性能。 8

V2 格式引入了 Bitmap Index Block 的概念,将所有的值分组到多个索引块中,每个索引块的大小可以通过 file-index.bitmap.<column_name>.index-block-size 配置(默认 16KB)。 9

V2 格式的主要改进:

  1. 二级索引:通过 Bitmap Index Block 实现快速定位
  2. 延迟加载:只在需要时才加载对应的索引块
  3. 更好的高基数支持:避免读取完整字典

索引块(Index Block)的格式:

  • entry number:4 字节整数,块内条目数量
  • value-offset-length 元组:每个值的元数据,包括值本身、在 bitmap 区的偏移量和长度

3.3 Bitmap 索引的优化技术

3.3.1 单值优化

当某个值在数据文件中只出现一次时,Paimon 不会为其创建完整的 bitmap,而是使用负数编码来表示该值的唯一位置。这种优化可以显著减少索引文件的大小。 10

具体实现:当 bitmap 的基数为 1 时,偏移量存储为 -1 - position,其中 position 是该值在数据文件中的行号。

3.3.2 RoaringBitmap32 压缩

Paimon 使用 RoaringBitmap32 作为底层 bitmap 实现,这是一种高效的压缩 bitmap 数据结构,能够在保持高性能的同时大幅减少内存和存储占用。 11

四、Bitmap 索引的创建流程

4.1 索引创建的整体流程

创建 Bitmap 索引的流程涉及多个组件的协作:

  1. 配置解析:从表属性中解析索引配置
  2. Writer 初始化:为每个需要索引的列创建 IndexMaintainer
  3. 数据写入:在数据写入过程中同步构建索引
  4. 索引序列化:将索引序列化为字节数组
  5. 索引存储:根据大小选择嵌入 manifest 或独立存储 12

4.2 创建索引的示例

4.2.1 使用 Spark SQL 创建带 Bitmap 索引的表
-- 创建包含 Bitmap 索引的表
CREATE TABLE user_events (
    user_id INT,
    event_type STRING,
    event_time TIMESTAMP,
    region STRING
) TBLPROPERTIES (
    'file-index.bitmap.columns' = 'event_type,region',
    'file-index.bitmap.event_type.index-block-size' = '32kb',
    'file-index.in-manifest-threshold' = '10kb'
);

-- 插入数据
INSERT INTO user_events VALUES 
    (1, 'login', TIMESTAMP '2024-01-01 10:00:00', 'US'),
    (2, 'click', TIMESTAMP '2024-01-01 10:01:00', 'EU'),
    (3, 'login', TIMESTAMP '2024-01-01 10:02:00', 'US'),
    (4, 'purchase', TIMESTAMP '2024-01-01 10:03:00', 'ASIA'),
    (5, 'click', TIMESTAMP '2024-01-01 10:04:00', 'US'),
    (6, 'login', TIMESTAMP '2024-01-01 10:05:00', 'EU');
```[13](#0-12) 

在这个例子中:
- `file-index.bitmap.columns` 指定为 `event_type``region` 列创建 Bitmap 索引
- `file-index.bitmap.event_type.index-block-size``event_type` 列设置索引块大小为 32KB
- `file-index.in-manifest-threshold` 设置为 10KB,小于该大小的索引直接嵌入 manifest

#### 4.2.2 为已有表添加索引

如果表已经存在,可以使用 ALTER TABLE 语句添加索引配置,然后使用 `rewrite_file_index` 存储过程重写索引:

```sql
-- 修改表属性,添加 Bitmap 索引配置
ALTER TABLE user_events SET TBLPROPERTIES (
    'file-index.bitmap.columns' = 'event_type,region'
);

-- 调用 rewrite_file_index 存储过程(Flink)
CALL sys.rewrite_file_index('default.user_events');

4.3 索引创建的代码实现

4.3.1 IndexMaintainer 的工作机制

DataFileIndexWriter 为每个列的每种索引类型维护一个 IndexMaintainer 实例。当数据行写入时,IndexMaintainer 会提取对应列的值并更新索引。 14

对于普通列,使用 FileIndexMaintainer;对于 Map 类型列,使用 MapFileIndexMaintainer15

4.3.2 Bitmap 索引的 Writer 实现

BitmapFileIndex.Writer 负责构建 Bitmap 索引。它维护一个 Map<Object, RoaringBitmap32>,将每个不同的值映射到一个 bitmap,记录包含该值的所有行号。 16

关键实现细节:

  • 使用 rowNumber 计数器跟踪当前行号
  • 对于 NULL 值,更新 nullBitmap
  • 对于非 NULL 值,根据值从 id2bitmap 获取或创建对应的 bitmap,然后添加当前行号
  • 使用 valueMapper 函数处理特殊类型(如 Timestamp)
4.3.3 索引序列化过程

当数据文件写入完成后,需要序列化 Bitmap 索引。序列化过程分为几个步骤:

  1. 序列化所有 bitmap:将每个 RoaringBitmap32 序列化为字节数组
  2. 构建索引元数据:根据版本(V1 或 V2)构建 BitmapFileIndexMeta
  3. 写入版本号:写入 1 字节的版本号
  4. 写入元数据:调用 bitmapFileIndexMeta.serialize(dos) 写入元数据
  5. 写入 bitmap 数据:按顺序写入所有 bitmap 字节数组 17
4.3.4 V2 格式的索引块构建

对于 V2 格式,需要额外构建索引块。所有的 value-offset 对会先按值排序,然后动态分组到多个 BitmapIndexBlock 中,每个块的大小不超过 blockSizeLimit(默认 16KB)。 18

4.4 索引文件的存储位置

索引创建完成后,根据其大小决定存储位置:

if (out.size() > inManifestThreshold) {
    // 索引较大,写入独立的索引文件
    // 文件名:数据文件名 + .index 后缀
    try (OutputStream outputStream = fileIO.newOutputStream(path, true)) {
        outputStream.write(out.toByteArray());
    }
    resultFileName = path.getName();
} else {
    // 索引较小,嵌入 manifest
    embeddedIndexBytes = out.toByteArray();
}
```[19](#0-18) 

嵌入 manifest 的索引存储在 `DataFileMeta` 的 `_EMBEDDED_FILE_INDEX` 字段中,独立存储的索引文件路径存储在 `_EXTRA_FILES` 字段中。

## 五、查询执行中的索引使用

### 5.1 索引过滤的整体流程

当执行查询时,Paimon 会利用文件索引过滤数据文件和行:

1. **谓词下推**:将查询条件下推到存储层
2. **文件级过滤**:通过 manifest 统计信息过滤文件
3. **索引级过滤**:读取文件索引,进一步过滤文件或标记需要读取的行
4. **数据读取**:只读取未被过滤的数据 [20](#0-19) 

### 5.2 查询示例

#### 5.2.1 等值查询

```sql
-- 查询特定事件类型的记录
SELECT * FROM user_events WHERE event_type = 'login';

执行流程:

  1. 谓词解析:提取条件 event_type = 'login'
  2. 索引读取:读取 event_type 列的 Bitmap 索引
  3. Bitmap 查找:从索引中查找值 'login' 对应的 bitmap
  4. 行过滤:根据 bitmap 确定需要读取的行号
  5. 数据读取:只读取 bitmap 中标记的行(第 0、2、5 行)

假设数据文件包含以下内容:

Row 0: user_id=1, event_type='login', ...
Row 1: user_id=2, event_type='click', ...
Row 2: user_id=3, event_type='login', ...
Row 3: user_id=4, event_type='purchase', ...
Row 4: user_id=5, event_type='click', ...
Row 5: user_id=6, event_type='login', ...

Bitmap 索引内容(简化表示):

'login'    -> RoaringBitmap32[0, 2, 5]
'click'    -> RoaringBitmap32[1, 4]
'purchase' -> RoaringBitmap32[3]

查询结果:只读取行 0、2、5。 21

5.2.2 IN 查询
-- 查询多个事件类型
SELECT * FROM user_events WHERE event_type IN ('login', 'purchase');

执行流程:

  1. 多值查询:提取 IN 列表 ['login', 'purchase']
  2. Bitmap 合并:获取两个值的 bitmap 并进行 OR 操作
  3. 结果 bitmapRoaringBitmap32[0, 2, 5] OR RoaringBitmap32[3] = RoaringBitmap32[0, 2, 3, 5]
  4. 数据读取:读取行 0、2、3、5 22

底层实现使用 RoaringBitmap32.or() 方法合并多个 bitmap: 23

5.2.3 NOT IN 查询
-- 排除特定事件类型
SELECT * FROM user_events WHERE event_type NOT IN ('login');

执行流程:

  1. 获取 IN 结果:先执行 IN 查询得到 RoaringBitmap32[0, 2, 5]
  2. Bitmap 翻转:对结果进行翻转操作 flip(0, rowCount)
  3. 结果 bitmapRoaringBitmap32[1, 3, 4]
  4. 数据读取:读取除了 login 之外的行 24
5.2.4 NULL 值查询
-- 查询 NULL 值
SELECT * FROM user_events WHERE region IS NULL;

-- 查询非 NULL 值
SELECT * FROM user_events WHERE region IS NOT NULL;

Bitmap 索引专门维护了一个 nullBitmap 来跟踪 NULL 值的位置。IS NULL 查询直接返回 nullBitmap,IS NOT NULL 查询返回翻转后的结果。 25

5.2.5 复合查询(AND/OR)
-- AND 查询
SELECT * FROM user_events 
WHERE event_type = 'login' AND region = 'US';

-- OR 查询
SELECT * FROM user_events 
WHERE event_type = 'login' OR region = 'US';

对于复合查询,Paimon 会递归地评估每个子条件,然后根据逻辑操作符合并结果:

  • AND 操作:对两个 bitmap 执行 AND 操作(交集)
  • OR 操作:对两个 bitmap 执行 OR 操作(并集) 26 27

5.3 索引读取的实现细节

5.3.1 FileIndexPredicate 的工作流程

FileIndexPredicate 是索引过滤的核心类,负责:

  1. 读取索引文件(从独立文件或 manifest 嵌入数据)
  2. 根据查询谓词评估索引
  3. 返回过滤结果(SKIP、REMAIN 或 BitmapIndexResult) 28
5.3.2 Bitmap 索引的 Reader 实现

BitmapFileIndex.Reader 负责读取和查询 Bitmap 索引。它采用延迟加载策略,只在第一次使用时才读取索引元数据。 29

关键方法:

  • readInternalMeta():读取索引元数据(版本、行数、值偏移映射等)
  • readBitmap():根据值查找并读取对应的 bitmap
  • visitEqual()visitIn()visitNotIn() 等:实现不同的谓词访问接口
5.3.3 V2 格式的二级索引查找

对于 V2 格式,查找过程分为两步:

  1. 定位索引块:使用二分查找在 indexBlocks 列表中找到包含目标值的块
  2. 查找具体条目:在索引块内再次使用二分查找定位具体的 value-offset-length 条目 30 31

这种二级索引设计显著减少了高基数列的索引读取开销,特别是当只需要查询少数几个值时。

5.3.4 Bitmap 数据的实际读取

当找到目标值的 entry 后,根据 offset 和 length 从索引文件中读取序列化的 bitmap 数据,然后反序列化为 RoaringBitmap32 对象。 32

特殊处理:

  • 如果 offset 为负数,表示单值优化,直接构造包含单个位置的 bitmap
  • 如果 length 为 -1(V1 格式),直接从流中反序列化
  • 否则,先读取指定长度的字节,再反序列化

5.4 索引结果的应用

5.4.1 FileIndexEvaluator 的作用

FileIndexEvaluator 负责将索引结果应用到实际的数据读取过程中,它会:

  1. 读取文件的索引(嵌入或独立文件)
  2. 评估查询谓词
  3. 结合 Deletion Vector(如果有)
  4. 返回最终的过滤结果 33
5.4.2 与 Deletion Vector 的集成

当表启用了删除向量(Deletion Vector)时,索引结果需要与删除向量进行 AND-NOT 操作,以排除已删除的行。 34

5.4.3 过滤结果的三种状态

文件索引评估可能返回三种结果:

  1. REMAIN:保留整个文件,无需行级过滤
  2. SKIP:完全跳过该文件
  3. BitmapIndexResult:部分选择,包含具体的行号 bitmap 35

5.5 性能优化

5.5.1 索引缓存

为了避免重复读取相同的 bitmap,BitmapFileIndex.Reader 使用 Map<Object, RoaringBitmap32> 缓存已读取的 bitmap。 36

5.5.2 延迟计算

BitmapIndexResult 使用 LazyField 模式,bitmap 的实际计算和 OR/AND 操作只在真正需要时才执行。 37

5.5.3 Early Return

在 AND 查询中,如果某个子条件返回 SKIP,整个查询立即返回 SKIP,无需评估其他条件。 38

六、实战案例分析

6.1 完整的端到端示例

让我们通过一个完整的示例来演示 Bitmap 索引的工作流程:

-- 1. 创建表
CREATE TABLE orders (
    order_id BIGINT,
    user_id BIGINT,
    status STRING,
    region STRING,
    amount DECIMAL(10, 2),
    order_date DATE
) TBLPROPERTIES (
    'bucket' = '4',
    'file-index.bitmap.columns' = 'status,region',
    'file-index.bitmap.status.version' = '2',
    'file-index.bitmap.status.index-block-size' = '16kb',
    'file-index.in-manifest-threshold' = '5kb'
);

-- 2. 插入测试数据
INSERT INTO orders VALUES
    (1001, 1, 'PENDING', 'US', 100.00, DATE '2024-01-01'),
    (1002, 2, 'COMPLETED', 'EU', 200.00, DATE '2024-01-01'),
    (1003, 3, 'PENDING', 'ASIA', 150.00, DATE '2024-01-01'),
    (1004, 4, 'CANCELLED', 'US', 50.00, DATE '2024-01-02'),
    (1005, 5, 'COMPLETED', 'EU', 300.00, DATE '2024-01-02'),
    (1006, 6, 'PENDING', 'US', 120.00, DATE '2024-01-02'),
    (1007, 7, 'COMPLETED', 'ASIA', 250.00, DATE '2024-01-03'),
    (1008, 8, 'CANCELLED', 'EU', 80.00, DATE '2024-01-03'),
    (1009, 9, 'PENDING', 'ASIA', 180.00, DATE '2024-01-03'),
    (1010, 10, 'COMPLETED', 'US', 400.00, DATE '2024-01-04');

-- 3. 查询待处理的订单
SELECT order_id, user_id, region, amount 
FROM orders 
WHERE status = 'PENDING';
索引创建过程
  1. 数据写入时

    • 每写入一行,DataFileIndexWriter 调用 write(row)
    • BitmapFileIndex.Writer 更新内存中的映射:
      'PENDING'   -> RoaringBitmap32.add(0, 2, 5, 8)
      'COMPLETED' -> RoaringBitmap32.add(1, 4, 6, 9)
      'CANCELLED' -> RoaringBitmap32.add(3, 7)
      
      'US'   -> RoaringBitmap32.add(0, 3, 5, 9)
      'EU'   -> RoaringBitmap32.add(1, 4, 7)
      'ASIA' -> RoaringBitmap32.add(2, 6, 8)
      
  2. 数据文件关闭时

    • 调用 serializedBytes() 序列化索引
    • 生成索引文件或嵌入 manifest
    • 索引文件结构:
      [Version: 2]
      [Row Count: 10]
      [Non-null Bitmap Count: 3 (for status), 3 (for region)]
      [Has Null: false]
      [Index Blocks: 1]
      [Index Block 0: CANCELLED -> offset 0]
      [Bitmap Body Offset: xxx]
      [Bitmap Data: serialized RoaringBitmap32 bytes...]
      
查询执行过程
  1. 谓词解析

    • 提取条件:status = 'PENDING'
    • 转换为 LeafPredicate 对象
  2. 索引定位

    • 读取数据文件的索引(从 manifest 或独立文件)
    • 创建 FileIndexPredicate 实例
    • 调用 evaluate(predicate)
  3. Bitmap 查找

    • BitmapFileIndex.Reader.visitEqual() 被调用
    • 读取索引元数据(如果还未读取)
    • 在 V2 格式的索引块中二分查找 ‘PENDING’
    • 读取对应的 bitmap:RoaringBitmap32[0, 2, 5, 8]
  4. 结果应用

    • 返回 BitmapIndexResult 包装的 bitmap
    • 数据读取器根据 bitmap 只读取行 0、2、5、8
    • 最终返回 4 条记录:
      order_id=1001, user_id=1, region=US, amount=100.00
      order_id=1003, user_id=3, region=ASIA, amount=150.00
      order_id=1006, user_id=6, region=US, amount=120.00
      order_id=1009, user_id=9, region=ASIA, amount=180.00
      

6.2 性能对比

假设数据文件包含 100 万行数据,其中 ‘PENDING’ 状态只有 1000 行:

不使用索引

  • 需要读取整个数据文件(假设 100MB)
  • 扫描 100 万行,过滤出 1000 行
  • IO:100MB,CPU:100 万次比较

使用 Bitmap 索引

  • 读取索引文件(假设 50KB)
  • 查找 bitmap(几次内存查找)
  • 只读取 1000 行对应的数据块(假设 100KB)
  • IO:150KB,CPU:几次 bitmap 操作 + 1000 次数据读取

性能提升:约 600 倍的 IO 减少,约 1000 倍的 CPU 减少

七、最佳实践与注意事项

7.1 何时使用 Bitmap 索引

适合场景

  1. 低基数列(不同值数量 < 1000)
  2. 频繁的等值查询或 IN 查询
  3. 需要组合多个条件的场景
  4. 列值分布相对均匀

不适合场景

  1. 高基数列(如用户 ID、订单号等)- 应使用 Range Bitmap 索引
  2. 范围查询主导的场景 - 应使用 Range Bitmap 索引或排序
  3. 列值极度倾斜(如 99% 都是同一个值)

7.2 配置建议

-- 基本配置
'file-index.bitmap.columns' = 'col1,col2'

-- V2 格式(推荐)
'

### Citations

**File:** docs/content/concepts/spec/fileindex.md (L27-33)
```markdown
# File index

Define `file-index.${index_type}.columns`, Paimon will create its corresponding index file for each file. If the index
file is too small, it will be stored directly in the manifest, or in the directory of the data file. Each data file
corresponds to an index file, which has a separate file definition and can contain different types of indexes with
multiple columns.

File: docs/content/concepts/spec/fileindex.md (L34-84)

## Index File

File index file format. Put all column and offset in the header.

<pre>
 ______________________________________    _____________________
|     magic    |version|head length  |
|--------------------------------------|
|            column number             |
|--------------------------------------|
|   column 1        | index number    |
|--------------------------------------|
|  index name 1 |start pos |length   |
|--------------------------------------|
|  index name 2 |start pos |length   |
|--------------------------------------|
|  index name 3 |start pos |length   |
|--------------------------------------|            HEAD
|   column 2        | index number    |
|--------------------------------------|
|  index name 1 |start pos |length   |
|--------------------------------------|
|  index name 2 |start pos |length   |
|--------------------------------------|
|  index name 3 |start pos |length   |
|--------------------------------------|
|                 ...                  |
|--------------------------------------|
|                 ...                  |
|--------------------------------------|
|  redundant length |redundant bytes  |
|--------------------------------------|    ---------------------
|                BODY                  |
|                BODY                  |
|                BODY                  |             BODY
|                BODY                  |
|______________________________________|    _____________________
*
magic:                            8 bytes long, value is 1493475289347502L, BIG_ENDIAN
version:                          4 bytes int, BIG_ENDIAN
head length:                      4 bytes int, BIG_ENDIAN
column number:                    4 bytes int, BIG_ENDIAN
column x name:                    var bytes, Java modified-utf-8
index number:                     4 bytes int (how many column items below), BIG_ENDIAN
index name x:                     var bytes, Java modified-utf-8
start pos:                        4 bytes int, BIG_ENDIAN
length:                           4 bytes int, BIG_ENDIAN
redundant length:                 4 bytes int (for compatibility with later versions, in this version, content is zero)
redundant bytes:                  var bytes (for compatibility with later version, in this version, is empty)
BODY:                             column index bytes + column index bytes + column index bytes + .......
</pre>

File: docs/content/concepts/spec/fileindex.md (L107-168)

{{< tab "V2" >}}

Bitmap file index format (V2):

<pre>

Bitmap file index format (V2)
+-------------------------------------------------+-----------------
| version (1 byte) = 2                           |
+-------------------------------------------------+
| row count (4 bytes int)                        |
+-------------------------------------------------+
| non-null value bitmap number (4 bytes int)     |
+-------------------------------------------------+
| has null value (1 byte)                        |
+-------------------------------------------------+
| null value offset (4 bytes if has null value)  |       HEAD
+-------------------------------------------------+
| null bitmap length (4 bytes if has null value) |
+-------------------------------------------------+
| bitmap index block number (4 bytes int)        |
+-------------------------------------------------+
| value 1 | offset 1                             |
+-------------------------------------------------+
| value 2 | offset 2                             |
+-------------------------------------------------+
| ...                                            |
+-------------------------------------------------+
| bitmap body offset (4 bytes int)               |
+-------------------------------------------------+-----------------
| bitmap index block 1                           |
+-------------------------------------------------+
| bitmap index block 2                           |  INDEX BLOCKS
+-------------------------------------------------+
| ...                                            |
+-------------------------------------------------+-----------------
| serialized bitmap 1                            |
+-------------------------------------------------+
| serialized bitmap 2                            |
+-------------------------------------------------+  BITMAP BLOCKS
| serialized bitmap 3                            |
+-------------------------------------------------+
| ...                                            |
+-------------------------------------------------+-----------------

index block format:
+-------------------------------------------------+
| entry number (4 bytes int)                     |
+-------------------------------------------------+
| value 1 | offset 1 | length 1                  |
+-------------------------------------------------+
| value 2 | offset 2 | length 2                  |
+-------------------------------------------------+
| ...                                            |
+-------------------------------------------------+

value x:                       var bytes for any data type (as bitmap identifier)
offset:                        4 bytes int (when it is negative, it represents that there is only one value
                                 and its position is the inverse of the negative value)
length:                        4 bytes int
  
</pre>

File: docs/content/concepts/spec/fileindex.md (L172-216)

{{< tab "V1 (Legacy)" >}}

(Legacy) Bitmap file index format (V1):

You can configure `file-index.bitmap.<column_name>.version` to use legacy bitmap version 1.

<pre>

Bitmap file index format (V1)
+-------------------------------------------------+-----------------
| version (1 byte)                                |
+-------------------------------------------------+
| row count (4 bytes int)                         |
+-------------------------------------------------+
| non-null value bitmap number (4 bytes int)      |
+-------------------------------------------------+
| has null value (1 byte)                         |
+-------------------------------------------------+
| null value offset (4 bytes if has null value)   |       HEAD
+-------------------------------------------------+
| value 1 | offset 1                              |
+-------------------------------------------------+
| value 2 | offset 2                              |
+-------------------------------------------------+
| value 3 | offset 3                              |
+-------------------------------------------------+
| ...                                             |
+-------------------------------------------------+-----------------
| serialized bitmap 1                             |
+-------------------------------------------------+
| serialized bitmap 2                             |
+-------------------------------------------------+       BODY
| serialized bitmap 3                             |
+-------------------------------------------------+
| ...                                             |
+-------------------------------------------------+-----------------
*
value x:                       var bytes for any data type (as bitmap identifier)
offset:                        4 bytes int (when it is negative, it represents that there is only one value
                                 and its position is the inverse of the negative value)
</pre>

{{< /tab >}}

{{< /tabs >}}

File: docs/content/concepts/spec/fileindex.md (L218-222)

Integers are all BIG_ENDIAN.

Bitmap only support the following data type: TinyIntType, SmallIntType, IntType, BigIntType, DateType, TimeType,
LocalZonedTimestampType, TimestampType, CharType, VarCharType, StringType, BooleanType.

File: paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java (L47-98)

/**
 * File index file format. Put all column and offset in the header.
 *
 * <pre>
 *  ______________________________________    _____________________
 * |     magic    |version|head length  |
 * |--------------------------------------|
 * |            column number             |
 * |--------------------------------------|
 * |   column 1        | index number    |
 * |--------------------------------------|
 * |  index name 1 |start pos |length   |
 * |--------------------------------------|
 * |  index name 2 |start pos |length   |
 * |--------------------------------------|
 * |  index name 3 |start pos |length   |
 * |--------------------------------------|            HEAD
 * |   column 2        | index number    |
 * |--------------------------------------|
 * |  index name 1 |start pos |length   |
 * |--------------------------------------|
 * |  index name 2 |start pos |length   |
 * |--------------------------------------|
 * |  index name 3 |start pos |length   |
 * |--------------------------------------|
 * |                 ...                  |
 * |--------------------------------------|
 * |                 ...                  |
 * |--------------------------------------|
 * |  redundant length |redundant bytes  |
 * |--------------------------------------|    ---------------------
 * |                BODY                  |
 * |                BODY                  |
 * |                BODY                  |             BODY
 * |                BODY                  |
 * |______________________________________|    _____________________
 *
 * magic:                            8 bytes long
 * version:                          4 bytes int
 * head length:                      4 bytes int
 * column number:                    4 bytes int
 * column x:                         var bytes utf (length + bytes)
 * index number:                     4 bytes int (how many column items below)
 * index name x:                     var bytes utf
 * start pos:                        4 bytes int
 * length:                           4 bytes int
 * redundant length:                 4 bytes int (for compatibility with later versions, in this version, content is zero)
 * redundant bytes:                  var bytes (for compatibility with later version, in this version, is empty)
 * BODY:                             column index bytes + column index bytes + column index bytes + .......
 *
 * </pre>
 */

File: paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java (L100-116)


    private static final long MAGIC = 1493475289347502L;
    private static final int EMPTY_INDEX_FLAG = -1;

    enum Version {
        V_1(1);

        private final int version;

        Version(int version) {
            this.version = version;
        }

        public int version() {
            return version;
        }
    }

File: docs/content/append-table/query-performance.md (L61-76)

## Data Skipping By File Index

You can use file index too, it filters files by indexing on the reading side.

Define `file-index.bitmap.columns`, Data file index is an external index file and Paimon will create its
corresponding index file for each file. If the index file is too small, it will be stored directly in the manifest,
otherwise in the directory of the data file. Each data file corresponds to an index file, which has a separate file
definition and can contain different types of indexes with multiple columns.

Different file indexes may be efficient in different scenarios. For example bloom filter may speed up query in point lookup
scenario. Using a bitmap may consume more space but can result in greater accuracy.

* [BloomFilter]({{< ref "concepts/spec/fileindex#index-bloomfilter" >}}): `file-index.bloom-filter.columns`.
* [Bitmap]({{< ref "concepts/spec/fileindex#index-bitmap" >}}): `file-index.bitmap.columns`.
* [Range Bitmap]({{< ref "concepts/spec/fileindex#index-range-bitmap" >}}): `file-index.range-bitmap.columns`.

File: paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapFileIndexMetaV2.java (L42-99)

/**
 * When the bitmap-indexed column cardinality is high, using the first version of the bitmap index
 * format will take a lot of time to read the entire dictionary. But in fact we don't need a full
 * dictionary when dealing with a small number of predicates, the performance of predicate hits on
 * the bitmap can be improved by creating a secondary index on the dictionary.
 *
 * <pre>
 * Bitmap file index format (V2)
 * +-------------------------------------------------+-----------------
 * | version (1 byte) = 2                           |
 * +-------------------------------------------------+
 * | row count (4 bytes int)                        |
 * +-------------------------------------------------+
 * | non-null value bitmap number (4 bytes int)     |
 * +-------------------------------------------------+
 * | has null value (1 byte)                        |
 * +-------------------------------------------------+
 * | null value offset (4 bytes if has null value)  |       HEAD
 * +-------------------------------------------------+
 * | null bitmap length (4 bytes if has null value) |
 * +-------------------------------------------------+
 * | bitmap index block number (4 bytes int)        |
 * +-------------------------------------------------+
 * | value 1 | offset 1                             |
 * +-------------------------------------------------+
 * | value 2 | offset 2                             |
 * +-------------------------------------------------+
 * | ...                                            |
 * +-------------------------------------------------+
 * | bitmap blocks offset (4 bytes int)             |
 * +-------------------------------------------------+-----------------
 * | bitmap index block 1                           |
 * +-------------------------------------------------+
 * | bitmap index block 2                           |  INDEX BLOCKS
 * +-------------------------------------------------+
 * | ...                                            |
 * +-------------------------------------------------+-----------------
 * | serialized bitmap 1                            |
 * +-------------------------------------------------+
 * | serialized bitmap 2                            |
 * +-------------------------------------------------+  BITMAP BLOCKS
 * | serialized bitmap 3                            |
 * +-------------------------------------------------+
 * | ...                                            |
 * +-------------------------------------------------+-----------------
 *
 * index block format:
 * +-------------------------------------------------+
 * | entry number (4 bytes int)                     |
 * +-------------------------------------------------+
 * | value 1 | offset 1 | length 1                  |
 * +-------------------------------------------------+
 * | value 2 | offset 2 | length 2                  |
 * +-------------------------------------------------+
 * | ...                                            |
 * +-------------------------------------------------+
 * </pre>
 */

File: paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapFileIndexMetaV2.java (L197-219)

    @Override
    public Entry findEntry(Object bitmapId) {
        if (bitmapId == null) {
            if (hasNullValue) {
                return new Entry(null, nullValueOffset, nullBitmapLength);
            }
        } else {
            BitmapIndexBlock block = findBlock(bitmapId);
            if (block != null) {
                return block.findEntry(bitmapId);
            }
        }
        return null;
    }

    private BitmapIndexBlock findBlock(Object bitmapId) {
        Comparator<Object> comparator = getComparator(dataType);
        int idx =
                Collections.binarySearch(
                        indexBlocks, null, (b1, ignore) -> comparator.compare(b1.key, bitmapId));
        idx = idx < 0 ? -2 - idx : idx;
        return idx < 0 ? null : indexBlocks.get(idx);
    }

File: paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapFileIndexMetaV2.java (L222-285)

    public void serialize(DataOutput out) throws Exception {

        ThrowableConsumer valueWriter = getValueWriter(out);

        out.writeInt(rowCount);
        out.writeInt(nonNullBitmapNumber);
        out.writeBoolean(hasNullValue);
        if (hasNullValue) {
            out.writeInt(nullValueOffset);
            out.writeInt(nullBitmapLength);
        }

        LinkedList<BitmapIndexBlock> indexBlocks = new LinkedList<>();
        this.indexBlocks = indexBlocks;
        if (!bitmapOffsets.isEmpty()) {
            indexBlocks.add(new BitmapIndexBlock(0));
        }
        Comparator<Object> comparator = getComparator(dataType);
        bitmapOffsets.entrySet().stream()
                .map(
                        it ->
                                new Entry(
                                        it.getKey(),
                                        it.getValue(),
                                        bitmapLengths == null
                                                ? -1
                                                : bitmapLengths.getOrDefault(it.getKey(), -1)))
                .sorted((e1, e2) -> comparator.compare(e1.key, e2.key))
                .forEach(
                        e -> {
                            BitmapIndexBlock last = indexBlocks.peekLast();
                            if (!last.tryAdd(e)) {
                                BitmapIndexBlock next =
                                        new BitmapIndexBlock(last.offset + last.serializedBytes);
                                indexBlocks.add(next);
                                if (!next.tryAdd(e)) {
                                    throw new RuntimeException("index fail");
                                }
                            }
                        });

        out.writeInt(indexBlocks.size());

        int bitmapBodyOffset = 0;
        for (BitmapIndexBlock e : indexBlocks) {
            // secondary entry
            valueWriter.accept(e.key);
            out.writeInt(e.offset);
            bitmapBodyOffset += e.serializedBytes;
        }

        // bitmap body offset
        out.writeInt(bitmapBodyOffset);

        // bitmap index blocks
        for (BitmapIndexBlock indexBlock : indexBlocks) {
            out.writeInt(indexBlock.entryList.size());
            for (Entry e : indexBlock.entryList) {
                valueWriter.accept(e.key);
                out.writeInt(e.offset);
                out.writeInt(e.length);
            }
        }
    }

File: paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapFileIndexMetaV2.java (L363-373)

        Entry findEntry(Object bitmapId) {
            tryDeserialize();
            Comparator<Object> comparator = getComparator(dataType);
            int idx =
                    Collections.binarySearch(
                            entryList, null, (e1, ignore) -> comparator.compare(e1.key, bitmapId));
            if (idx >= 0) {
                return entryList.get(idx);
            }
            return null;
        }

File: paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapFileIndex.java (L33-33)

import org.apache.paimon.utils.RoaringBitmap32;

File: paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapFileIndex.java (L80-106)

    private static class Writer extends FileIndexWriter {

        private final int version;
        private final DataType dataType;
        private final Function<Object, Object> valueMapper;
        private final Map<Object, RoaringBitmap32> id2bitmap = new HashMap<>();
        private final RoaringBitmap32 nullBitmap = new RoaringBitmap32();
        private int rowNumber;
        private final Options options;

        public Writer(DataType dataType, Options options) {
            this.version = options.getInteger(VERSION, VERSION_2);
            this.dataType = dataType;
            this.valueMapper = getValueMapper(dataType);
            this.options = options;
        }

        @Override
        public void write(Object key) {
            if (key == null) {
                nullBitmap.add(rowNumber++);
            } else {
                id2bitmap
                        .computeIfAbsent(valueMapper.apply(key), k -> new RoaringBitmap32())
                        .add(rowNumber++);
            }
        }

File: paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapFileIndex.java (L108-190)

        @Override
        public byte[] serializedBytes() {

            try {

                ByteArrayOutputStream output = new ByteArrayOutputStream();
                DataOutputStream dos = new DataOutputStream(output);

                dos.writeByte(version);

                // 1.serialize bitmaps to bytes
                byte[] nullBitmapBytes = nullBitmap.serialize();
                Map<Object, byte[]> id2bitmapBytes =
                        id2bitmap.entrySet().stream()
                                .collect(
                                        Collectors.toMap(
                                                Map.Entry::getKey, e -> e.getValue().serialize()));

                // 2.build bitmap file index meta
                LinkedHashMap<Object, Integer> bitmapOffsets = new LinkedHashMap<>();
                LinkedList<byte[]> serializeBitmaps = new LinkedList<>();
                int[] offsetRef = {
                    nullBitmap.isEmpty() || nullBitmap.getCardinality() == 1
                            ? 0
                            : nullBitmapBytes.length
                };
                id2bitmap.forEach(
                        (k, v) -> {
                            if (v.getCardinality() == 1) {
                                bitmapOffsets.put(k, -1 - v.iterator().next());
                            } else {
                                byte[] bytes = id2bitmapBytes.get(k);
                                serializeBitmaps.add(bytes);
                                bitmapOffsets.put(k, offsetRef[0]);
                                offsetRef[0] += bytes.length;
                            }
                        });
                BitmapFileIndexMeta bitmapFileIndexMeta;
                if (version == VERSION_1) {
                    bitmapFileIndexMeta =
                            new BitmapFileIndexMeta(
                                    dataType,
                                    options,
                                    rowNumber,
                                    id2bitmap.size(),
                                    !nullBitmap.isEmpty(),
                                    nullBitmap.getCardinality() == 1
                                            ? -1 - nullBitmap.iterator().next()
                                            : 0,
                                    bitmapOffsets);
                } else if (version == VERSION_2) {
                    bitmapFileIndexMeta =
                            new BitmapFileIndexMetaV2(
                                    dataType,
                                    options,
                                    rowNumber,
                                    id2bitmap.size(),
                                    !nullBitmap.isEmpty(),
                                    nullBitmap.getCardinality() == 1
                                            ? -1 - nullBitmap.iterator().next()
                                            : 0,
                                    nullBitmapBytes.length,
                                    bitmapOffsets,
                                    offsetRef[0]);
                } else {
                    throw new RuntimeException("invalid version: " + version);
                }

                // 3.serialize meta
                bitmapFileIndexMeta.serialize(dos);

                // 4.serialize body
                if (nullBitmap.getCardinality() > 1) {
                    dos.write(nullBitmapBytes);
                }
                for (byte[] bytes : serializeBitmaps) {
                    dos.write(bytes);
                }
                return output.toByteArray();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

File: paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapFileIndex.java (L193-312)

    private static class Reader extends FileIndexReader {

        private final SeekableInputStream seekableInputStream;
        private final int headStart;
        private final Map<Object, RoaringBitmap32> bitmaps = new LinkedHashMap<>();

        private BitmapFileIndexMeta bitmapFileIndexMeta;
        private Function<Object, Object> valueMapper;

        private final Options options;

        public Reader(SeekableInputStream seekableInputStream, int start, Options options) {
            this.seekableInputStream = seekableInputStream;
            this.headStart = start;
            this.options = options;
        }

        @Override
        public FileIndexResult visitEqual(FieldRef fieldRef, Object literal) {
            return visitIn(fieldRef, Collections.singletonList(literal));
        }

        @Override
        public FileIndexResult visitNotEqual(FieldRef fieldRef, Object literal) {
            return visitNotIn(fieldRef, Collections.singletonList(literal));
        }

        @Override
        public FileIndexResult visitIn(FieldRef fieldRef, List<Object> literals) {
            return new BitmapIndexResult(
                    () -> {
                        readInternalMeta(fieldRef.type());
                        return getInListResultBitmap(literals);
                    });
        }

        @Override
        public FileIndexResult visitNotIn(FieldRef fieldRef, List<Object> literals) {
            return new BitmapIndexResult(
                    () -> {
                        readInternalMeta(fieldRef.type());
                        RoaringBitmap32 bitmap = getInListResultBitmap(literals);
                        bitmap.flip(0, bitmapFileIndexMeta.getRowCount());
                        return bitmap;
                    });
        }

        @Override
        public FileIndexResult visitIsNull(FieldRef fieldRef) {
            return visitIn(fieldRef, Collections.singletonList(null));
        }

        @Override
        public FileIndexResult visitIsNotNull(FieldRef fieldRef) {
            return visitNotIn(fieldRef, Collections.singletonList(null));
        }

        private RoaringBitmap32 getInListResultBitmap(List<Object> literals) {
            return RoaringBitmap32.or(
                    literals.stream()
                            .map(
                                    it ->
                                            bitmaps.computeIfAbsent(
                                                    valueMapper.apply(it), this::readBitmap))
                            .iterator());
        }

        private RoaringBitmap32 readBitmap(Object bitmapId) {
            try {
                BitmapFileIndexMeta.Entry entry = bitmapFileIndexMeta.findEntry(bitmapId);
                if (entry == null) {
                    return new RoaringBitmap32();
                } else {
                    int offset = entry.offset;
                    if (offset < 0) {
                        return RoaringBitmap32.bitmapOf(-1 - offset);
                    } else {
                        seekableInputStream.seek(bitmapFileIndexMeta.getBodyStart() + offset);
                        RoaringBitmap32 bitmap = new RoaringBitmap32();
                        int length = entry.length;
                        if (length != -1) {
                            DataInputStream input = new DataInputStream(seekableInputStream);
                            byte[] bytes = new byte[length];
                            input.readFully(bytes);
                            bitmap.deserialize(ByteBuffer.wrap(bytes));
                            return bitmap;
                        }
                        bitmap.deserialize(new DataInputStream(seekableInputStream));
                        return bitmap;
                    }
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        private void readInternalMeta(DataType dataType) {
            if (this.bitmapFileIndexMeta == null) {
                this.valueMapper = getValueMapper(dataType);
                try {
                    seekableInputStream.seek(headStart);
                    int version = seekableInputStream.read();
                    if (version == VERSION_1) {
                        this.bitmapFileIndexMeta = new BitmapFileIndexMeta(dataType, options);
                        this.bitmapFileIndexMeta.deserialize(seekableInputStream);
                    } else if (version == VERSION_2) {
                        this.bitmapFileIndexMeta = new BitmapFileIndexMetaV2(dataType, options);
                        this.bitmapFileIndexMeta.deserialize(seekableInputStream);
                    } else if (version > VERSION_2) {
                        throw new RuntimeException(
                                String.format(
                                        "read index file fail, "
                                                + "your plugin version is lower than %d",
                                        version));
                    }
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        }

File: paimon-core/src/main/java/org/apache/paimon/io/DataFileIndexWriter.java (L51-149)

/** Index file writer for a data file. */
public final class DataFileIndexWriter implements Closeable {

    public static final FileIndexResult EMPTY_RESULT = FileIndexResult.of(null, null);

    private final FileIO fileIO;

    private final Path path;

    // if the filter size greater than fileIndexInManifestThreshold, we put it in file
    private final long inManifestThreshold;

    // index type, column name -> index maintainer
    private final Map<String, Map<String, IndexMaintainer>> indexMaintainers = new HashMap<>();

    private String resultFileName;

    private byte[] embeddedIndexBytes;

    public DataFileIndexWriter(
            FileIO fileIO,
            Path path,
            RowType rowType,
            FileIndexOptions fileIndexOptions,
            @Nullable Map<String, String> colNameMapping) {
        this.fileIO = fileIO;
        this.path = path;
        List<DataField> fields = rowType.getFields();
        Map<String, DataField> map = new HashMap<>();
        Map<String, Integer> index = new HashMap<>();
        fields.forEach(
                dataField -> {
                    map.put(dataField.name(), dataField);
                    index.put(dataField.name(), rowType.getFieldIndex(dataField.name()));
                });
        for (Map.Entry<FileIndexOptions.Column, Map<String, Options>> entry :
                fileIndexOptions.entrySet()) {
            FileIndexOptions.Column entryColumn = entry.getKey();
            String colName = entryColumn.getColumnName();
            if (colNameMapping != null) {
                colName = colNameMapping.getOrDefault(colName, null);
                if (colName == null) {
                    continue;
                }
            }

            String columnName = colName;
            DataField field = map.get(columnName);
            if (field == null) {
                throw new IllegalArgumentException(columnName + " does not exist in column fields");
            }

            for (Map.Entry<String, Options> typeEntry : entry.getValue().entrySet()) {
                String indexType = typeEntry.getKey();
                Map<String, IndexMaintainer> column2maintainers =
                        indexMaintainers.computeIfAbsent(indexType, k -> new HashMap<>());
                IndexMaintainer maintainer = column2maintainers.get(columnName);
                if (entryColumn.isNestedColumn()) {
                    if (field.type().getTypeRoot() != DataTypeRoot.MAP) {
                        throw new IllegalArgumentException(
                                "Column "
                                        + columnName
                                        + " is nested column, but is not map type. Only should map type yet.");
                    }
                    MapFileIndexMaintainer mapMaintainer = (MapFileIndexMaintainer) maintainer;
                    if (mapMaintainer == null) {
                        MapType mapType = (MapType) field.type();
                        mapMaintainer =
                                new MapFileIndexMaintainer(
                                        columnName,
                                        indexType,
                                        mapType.getKeyType(),
                                        mapType.getValueType(),
                                        fileIndexOptions.getMapTopLevelOptions(
                                                columnName, typeEntry.getKey()),
                                        index.get(columnName));
                        column2maintainers.put(columnName, mapMaintainer);
                    }
                    mapMaintainer.add(entryColumn.getNestedColumnName(), typeEntry.getValue());
                } else {
                    if (maintainer == null) {
                        maintainer =
                                new FileIndexMaintainer(
                                        columnName,
                                        indexType,
                                        FileIndexer.create(
                                                        indexType,
                                                        field.type(),
                                                        typeEntry.getValue())
                                                .createWriter(),
                                        InternalRow.createFieldGetter(
                                                field.type(), index.get(columnName)));
                        column2maintainers.put(columnName, maintainer);
                    }
                }
            }
        }
        this.inManifestThreshold = fileIndexOptions.fileIndexInManifestThreshold();
    }

File: paimon-core/src/main/java/org/apache/paimon/io/DataFileIndexWriter.java (L151-157)

    public void write(InternalRow row) {
        indexMaintainers
                .values()
                .forEach(
                        column2maintainers ->
                                column2maintainers.values().forEach(index -> index.write(row)));
    }

File: paimon-core/src/main/java/org/apache/paimon/io/DataFileIndexWriter.java (L159-176)

    @Override
    public void close() throws IOException {
        Map<String, Map<String, byte[]>> indexMaps = serializeMaintainers();

        ByteArrayOutputStream out = new ByteArrayOutputStream();
        try (FileIndexFormat.Writer writer = FileIndexFormat.createWriter(out)) {
            writer.writeColumnIndexes(indexMaps);
        }

        if (out.size() > inManifestThreshold) {
            try (OutputStream outputStream = fileIO.newOutputStream(path, true)) {
                outputStream.write(out.toByteArray());
            }
            resultFileName = path.getName();
        } else {
            embeddedIndexBytes = out.toByteArray();
        }
    }

File: paimon-core/src/main/java/org/apache/paimon/io/DataFileIndexWriter.java (L249-279)

    /** One index maintainer for one column. */
    private static class FileIndexMaintainer implements IndexMaintainer {

        private final String columnName;
        private final String indexType;
        private final FileIndexWriter fileIndexWriter;
        private final InternalRow.FieldGetter getter;

        public FileIndexMaintainer(
                String columnName,
                String indexType,
                FileIndexWriter fileIndexWriter,
                InternalRow.FieldGetter getter) {
            this.columnName = columnName;
            this.indexType = indexType;
            this.fileIndexWriter = fileIndexWriter;
            this.getter = getter;
        }

        public void write(InternalRow row) {
            fileIndexWriter.writeRecord(getter.getFieldOrNull(row));
        }

        public String getIndexType() {
            return indexType;
        }

        public Map<String, byte[]> serializedBytes() {
            return Collections.singletonMap(columnName, fileIndexWriter.serializedBytes());
        }
    }

File: paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java (L98-106)

    @Test
    public void testReadWriteTableWithBitmapIndex() throws Catalog.TableNotExistException {

        spark.sql(
                "CREATE TABLE T(a int) TBLPROPERTIES ("
                        + "'file-index.bitmap.columns'='a',"
                        + "'file-index.bitmap.a.index-block-size'='32kb',"
                        + "'file-index.in-manifest-threshold'='1B');");
        spark.sql("INSERT INTO T VALUES (0),(1),(2),(3),(4),(5);");

File: paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java (L52-88)

/** Utils to check secondary index (e.g. bloom filter) predicate. */
public class FileIndexPredicate implements Closeable {

    private static final Logger LOG = LoggerFactory.getLogger(FileIndexPredicate.class);

    private final FileIndexFormat.Reader reader;

    @Nullable private Path path;

    public FileIndexPredicate(Path path, FileIO fileIO, RowType fileRowType) throws IOException {
        this(fileIO.newInputStream(path), fileRowType);
        this.path = path;
    }

    public FileIndexPredicate(byte[] serializedBytes, RowType fileRowType) {
        this(new ByteArraySeekableStream(serializedBytes), fileRowType);
    }

    public FileIndexPredicate(SeekableInputStream inputStream, RowType fileRowType) {
        this.reader = FileIndexFormat.createReader(inputStream, fileRowType);
    }

    public FileIndexResult evaluate(@Nullable Predicate predicate) {
        if (predicate == null) {
            return REMAIN;
        }
        Set<String> requiredFieldNames = getRequiredNames(predicate);
        Map<String, Collection<FileIndexReader>> indexReaders = new HashMap<>();
        requiredFieldNames.forEach(name -> indexReaders.put(name, reader.readColumnIndex(name)));
        FileIndexResult result = new FileIndexPredicateTest(indexReaders).test(predicate);
        if (!result.remain()) {
            LOG.debug(
                    "One file has been filtered: "
                            + (path == null ? "in scan stage" : path.toString()));
        }
        return result;
    }

File: paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java (L142-199)

    /** Predicate test worker. */
    private static class FileIndexPredicateTest implements PredicateVisitor<FileIndexResult> {

        private final Map<String, Collection<FileIndexReader>> columnIndexReaders;

        public FileIndexPredicateTest(Map<String, Collection<FileIndexReader>> fileIndexReaders) {
            this.columnIndexReaders = fileIndexReaders;
        }

        public FileIndexResult test(Predicate predicate) {
            return predicate.visit(this);
        }

        @Override
        public FileIndexResult visit(LeafPredicate predicate) {
            FileIndexResult compoundResult = REMAIN;
            FieldRef fieldRef =
                    new FieldRef(predicate.index(), predicate.fieldName(), predicate.type());
            for (FileIndexReader fileIndexReader : columnIndexReaders.get(predicate.fieldName())) {
                compoundResult =
                        compoundResult.and(
                                predicate
                                        .function()
                                        .visit(fileIndexReader, fieldRef, predicate.literals()));

                if (!compoundResult.remain()) {
                    return compoundResult;
                }
            }
            return compoundResult;
        }

        @Override
        public FileIndexResult visit(CompoundPredicate predicate) {
            if (predicate.function() instanceof Or) {
                FileIndexResult compoundResult = null;
                for (Predicate predicate1 : predicate.children()) {
                    compoundResult =
                            compoundResult == null
                                    ? predicate1.visit(this)
                                    : compoundResult.or(predicate1.visit(this));
                }
                return compoundResult == null ? REMAIN : compoundResult;
            } else {
                FileIndexResult compoundResult = null;
                for (Predicate predicate1 : predicate.children()) {
                    compoundResult =
                            compoundResult == null
                                    ? predicate1.visit(this)
                                    : compoundResult.and(predicate1.visit(this));
                    // if not remain, no need to test anymore
                    if (!compoundResult.remain()) {
                        return compoundResult;
                    }
                }
                return compoundResult == null ? REMAIN : compoundResult;
            }
        }

File: paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapIndexResult.java (L28-56)

/** bitmap file index result. */
public class BitmapIndexResult extends LazyField<RoaringBitmap32> implements FileIndexResult {

    public BitmapIndexResult(Supplier<RoaringBitmap32> supplier) {
        super(supplier);
    }

    @Override
    public boolean remain() {
        return !get().isEmpty();
    }

    @Override
    public FileIndexResult and(FileIndexResult fileIndexResult) {
        if (fileIndexResult instanceof BitmapIndexResult) {
            return new BitmapIndexResult(
                    () -> RoaringBitmap32.and(get(), ((BitmapIndexResult) fileIndexResult).get()));
        }
        return FileIndexResult.super.and(fileIndexResult);
    }

    @Override
    public FileIndexResult or(FileIndexResult fileIndexResult) {
        if (fileIndexResult instanceof BitmapIndexResult) {
            return new BitmapIndexResult(
                    () -> RoaringBitmap32.or(get(), ((BitmapIndexResult) fileIndexResult).get()));
        }
        return FileIndexResult.super.or(fileIndexResult);
    }

File: paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java (L42-98)

/** Evaluate file index result. */
public class FileIndexEvaluator {

    public static FileIndexResult evaluate(
            FileIO fileIO,
            TableSchema dataSchema,
            List<Predicate> dataFilter,
            @Nullable TopN topN,
            @Nullable Integer limit,
            DataFilePathFactory dataFilePathFactory,
            DataFileMeta file,
            @Nullable DeletionVector dv)
            throws IOException {
        if (isNullOrEmpty(dataFilter) && topN == null) {
            if (limit == null) {
                return FileIndexResult.REMAIN;
            } else {
                // limit can not work with other predicates.
                return createBaseSelection(file, dv).limit(limit);
            }
        }

        try (FileIndexPredicate predicate =
                createFileIndexPredicate(fileIO, dataSchema, dataFilePathFactory, file)) {
            if (predicate == null) {
                return FileIndexResult.REMAIN;
            }

            BitmapIndexResult selection = createBaseSelection(file, dv);
            FileIndexResult result;
            if (!isNullOrEmpty(dataFilter)) {
                Predicate filter = PredicateBuilder.and(dataFilter.toArray(new Predicate[0]));
                result = predicate.evaluate(filter);
                result.and(selection);
            } else if (topN != null) {
                // 1. TopN cannot work with filter, because a filter may not completely filter out
                // all records, any unfiltered records can affect the calculation results of TopN
                // 2. evaluateTopN with selection, because we must filter out the data based on
                // deletion vector before selecting TopN records.
                result = predicate.evaluateTopN(topN, selection);
            } else {
                return FileIndexResult.REMAIN;
            }

            // if all position selected, or if only and not the deletion
            // the effect will not obvious, just return REMAIN.
            if (Objects.equals(result, selection)) {
                return FileIndexResult.REMAIN;
            }

            if (!result.remain()) {
                return FileIndexResult.SKIP;
            }

            return result;
        }
    }

File: paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java (L100-109)

    private static BitmapIndexResult createBaseSelection(
            DataFileMeta file, @Nullable DeletionVector dv) {
        BitmapIndexResult selection =
                new BitmapIndexResult(() -> RoaringBitmap32.bitmapOfRange(0, file.rowCount()));
        if (dv instanceof BitmapDeletionVector) {
            RoaringBitmap32 deletion = ((BitmapDeletionVector) dv).get();
            selection = selection.andNot(deletion);
        }
        return selection;
    }
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐