使用 Gravitino 与 Apache Flink 进行流式处理
在本教程中,您将学习如何使用 Apache Gravitino 与 Apache Flink 构建一个简单的流式管道。您将在 Gravitino 中创建一个 Hive catalog 和一个 Paimon catalog,在 Hive catalog 中定义一个基于 Kafka 的,然后使用 Flink SQL(通过 Gravitino Flink connector)从 Kafka 读取数据并写

作者:xiaojing
最后更新:2026-03-11
概述
在本教程中,您将学习如何使用 Apache Gravitino 与 Apache Flink 构建一个简单的流式管道。您将在 Gravitino 中创建一个 Hive catalog 和一个 Paimon catalog,在 Hive catalog 中定义一个基于 Kafka 的 generic table,然后使用 Flink SQL(通过 Gravitino Flink connector)从 Kafka 读取数据并写入 Paimon table。
您将完成的任务:
- 配置 Gravitino Flink connector
- 在 Gravitino 中创建 Hive 和 Paimon catalog
- 定义 Kafka generic table
- 使用 Flink SQL 将数据从 Kafka 流式写入 Paimon
架构概览:
前提条件
系统要求:
- Linux 或 macOS
- JDK 17+(Gravitino 服务器所需;本教程假设使用 JDK 17 或更高版本)
- Apache Flink 1.18(推荐用于 Gravitino Flink connector)
必需组件:
- Gravitino 服务器 v1.2.0 或更高版本(本教程需要 v1.1.0 之后引入的功能;参见 02-setup-guide/README.md)
- Hive Metastore(用于 Hive catalog)
- Apache Kafka broker(用于 Kafka 源表)
建议版本:
- 与您的 Flink 版本匹配的 Apache Paimon connector JAR
继续之前,请验证您的 Java 和 Flink 安装:
${JAVA_HOME}/bin/java -version
${FLINK_HOME}/bin/flink --version
分步指南
步骤 1:设置环境变量
以下值在整个教程中使用,请根据您的环境进行调整:
export GRAVITINO_URI="http://localhost:8090"
export METALAKE_NAME="default_metalake"
export HIVE_METASTORE_URI="thrift://localhost:9083"
export PAIMON_WAREHOUSE="file:///tmp/paimon-warehouse"
export KAFKA_BROKERS="localhost:9092"
步骤 2:在 Gravitino 中创建 Hive 和 Paimon Catalog
使用 Gravitino REST API 创建 Hive catalog 和 Paimon catalog。
如果需要传递 Hive 特定的配置(例如 hive-conf-dir),请在 catalog 属性中使用 flink.bypass. 前缀(例如 flink.bypass.hive-conf-dir),这些配置会被转发给 Flink Hive connector。
# 创建 Hive catalog
curl -X POST -H "Accept: application/vnd.gravitino.v1+json" \
-H "Content-Type: application/json" -d '{
"name": "hive_catalog",
"type": "relational",
"comment": "Hive catalog for Flink streaming",
"provider": "hive",
"properties": {
"metastore.uris": "'"$HIVE_METASTORE_URI"'"
}
}' ${GRAVITINO_URI}/api/metalakes/${METALAKE_NAME}/catalogs
# 创建 Paimon catalog(文件系统后端)
curl -X POST -H "Accept: application/vnd.gravitino.v1+json" \
-H "Content-Type: application/json" -d '{
"name": "paimon_catalog",
"type": "relational",
"comment": "Paimon catalog for Flink streaming",
"provider": "lakehouse-paimon",
"properties": {
"catalog-backend": "filesystem",
"warehouse": "'"$PAIMON_WAREHOUSE"'"
}
}' ${GRAVITINO_URI}/api/metalakes/${METALAKE_NAME}/catalogs
步骤 3:在 Flink 中安装所需的 JAR
将以下 JAR 放入 FLINK_HOME/lib 目录,以便 Flink SQL 能够加载它们:
gravitino-flink-connector-runtime-1.18_2.12-<version>.jarpaimon-flink-1.18-<version>.jarflink-sql-connector-kafka-<version>.jar- Flink HiveCatalog 所需的 Hive 依赖(与 Flink-Hive 集成相同)
提示:Kafka SQL connector 不包含在 Flink 二进制发行版中,需要单独添加。
步骤 4:配置 Flink 使用 Gravitino catalog store
编辑 FLINK_HOME/conf/flink-conf.yaml,添加以下配置(请替换为您的实际值):
table.catalog-store.kind: gravitino
table.catalog-store.gravitino.gravitino.metalake: ${METALAKE_NAME}
table.catalog-store.gravitino.gravitino.uri: ${GRAVITINO_URI}
如果 Flink 正在运行,请重启它,然后确保 Flink 集群可访问:
${FLINK_HOME}/bin/start-cluster.sh
curl -sS http://localhost:8081/overview
如果 curl 返回连接被拒绝,步骤 7 中的 INSERT INTO ... SELECT ... 将会失败,因为 SQL Client 无法向集群提交作业。
步骤 5:在 Hive Catalog 中创建 Kafka Generic Table
Flink 的 HiveCatalog 支持 Hive 兼容表和 generic table。在 HiveCatalog 中,除非您显式设置 'connector' = 'hive' 或使用 Hive dialect,否则 table 默认为 generic table。这里我们创建一个 Kafka generic table,元数据存储在 Hive Metastore 中,而数据由 Flink 从 Kafka 读取。如果您需要 Hive 兼容表,请使用 Hive dialect 或设置 'connector' = 'hive'。
启动 Flink SQL 客户端:
${FLINK_HOME}/bin/sql-client.sh
在 SQL 客户端中执行以下语句:
-- 使用 Gravitino 管理的 Hive catalog
USE CATALOG hive_catalog;
CREATE DATABASE IF NOT EXISTS streaming_db;
USE streaming_db;
-- Kafka 源表,作为 generic table 存储在 Hive catalog 中
CREATE TABLE kafka_events (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
ts TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = '${KAFKA_BROKERS}', -- 替换为您的 Kafka brokers
'properties.group.id' = 'gravitino-flink-demo',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.ignore-parse-errors' = 'true'
);
关于 generic table 的说明:
- HiveCatalog 支持 Hive 兼容表和 generic table。Hive 兼容表以 Hive 兼容的方式存储,可以从 Hive 中查询。
- Generic table 是 Flink 特有的。Hive 可以在 Hive Metastore 中看到元数据,但通常无法解析它,因此从 Hive 查询是未定义行为。
- 如果您需要使用默认 dialect 的 Hive 兼容表,请设置
'connector' = 'hive'。如果使用 Hive dialect,则不需要connector属性。 - 在 Gravitino 中,generic table 的 schema 和分区键存储在 Hive Metastore 的
flink.*属性中。如果connector=hive,则该 table 被视为具有原生 Hive schema 的 Hive 兼容表。
步骤 6:创建 Paimon Sink Table
USE CATALOG paimon_catalog;
CREATE DATABASE IF NOT EXISTS streaming_db;
USE streaming_db;
CREATE TABLE paimon_user_behavior (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
ts TIMESTAMP_LTZ(3)
);
步骤 7:将数据从 Kafka 流式写入 Paimon
SET 'execution.checkpointing.interval' = '10 s';
INSERT INTO paimon_catalog.streaming_db.paimon_user_behavior
SELECT user_id, item_id, behavior, ts
FROM hive_catalog.streaming_db.kafka_events;
如果 Kafka 正在接收 user_behavior 主题的数据,Flink 将持续将其写入 Paimon table。
流式写入 Paimon 需要定期 checkpoint 来提交数据。
代码示例
Kafka 示例消息(JSON 格式):
{"user_id": 1, "item_id": 1001, "behavior": "click"}
{"user_id": 2, "item_id": 1002, "behavior": "buy"}
故障排除
- Catalog 在 Flink 中不可见:检查
flink-conf.yaml中的table.catalog-store.*配置,并确认 Gravitino 服务器可访问。 - ClassNotFoundException:确保 Gravitino connector、Kafka connector 和 Paimon JAR 已放入
FLINK_HOME/lib目录。 - java.net.ConnectException: Connection refused(执行
INSERT INTO时):Flink SQL 客户端无法访问 JobManager REST 端点(默认localhost:8081)。使用${FLINK_HOME}/bin/start-cluster.sh启动集群,并通过curl http://localhost:8081/overview验证。 - 作业处于 RUNNING 状态但 Paimon 中没有新数据:确保在流式模式下启用了 checkpoint(例如
SET 'execution.checkpointing.interval' = '10 s';),并在 Flink Web UI 或/jobs/<job-id>/checkpoints中检查 checkpoint 进度。 - 作业处于 RUNNING 状态但重新运行后预期记录被跳过:Kafka 偏移量由
properties.group.id跟踪。如果需要重新消费,请使用新的 group id(例如gravitino-flink-demo-v2)。 - Table 未找到:使用完全限定名称,如
hive_catalog.streaming_db.kafka_events和paimon_catalog.streaming_db.paimon_user_behavior。
恭喜
您已成功完成 Gravitino Flink 流式处理教程!
您现在拥有一个功能完整的 Flink 流式处理环境,集成了 Gravitino,包括:
- 配置好的 Gravitino Flink connector,用于统一的 catalog 访问
- 在 Gravitino 中注册的 Hive 和 Paimon catalog,可从 Flink SQL 访问
- 一个可工作的流式管道,从 Kafka 读取数据并写入 Paimon
- 理解 HiveCatalog 中 generic table 与 Hive 兼容表的区别
您的 Flink 环境现在已准备好利用 Gravitino 在流式数据生态系统中进行统一的元数据管理。
延伸阅读
有关更高级的配置和详细文档:
- 查看 Gravitino Flink Connector 文档 了解高级配置选项
- 学习 Apache Flink SQL 了解更多查询模式
- 探索 Apache Paimon with Flink 了解 Paimon 特定功能
下一步
- 探索 Gravitino 的 Iceberg catalog,参见 03-iceberg-catalog/README.md
- 使用 Trino 进行联邦查询,参见 06-trino-query/README.md
- 关注并 star Apache Gravitino 仓库
Apache Gravitino 正在快速发展,本文基于最新版本 1.1.0 编写。如果您遇到问题,请参考官方文档或在 GitHub 上提交 issue。
更多推荐



所有评论(0)