在这里插入图片描述

作者:xiaojing
最后更新:2026-03-11

概述

在本教程中,您将学习如何使用 Apache Gravitino 与 Apache Flink 构建一个简单的流式管道。您将在 Gravitino 中创建一个 Hive catalog 和一个 Paimon catalog,在 Hive catalog 中定义一个基于 Kafka 的 generic table,然后使用 Flink SQL(通过 Gravitino Flink connector)从 Kafka 读取数据并写入 Paimon table。

您将完成的任务:

  • 配置 Gravitino Flink connector
  • 在 Gravitino 中创建 Hive 和 Paimon catalog
  • 定义 Kafka generic table
  • 使用 Flink SQL 将数据从 Kafka 流式写入 Paimon

架构概览:
Gravitino Flink Streaming Architecture

前提条件

系统要求:

  • Linux 或 macOS
  • JDK 17+(Gravitino 服务器所需;本教程假设使用 JDK 17 或更高版本)
  • Apache Flink 1.18(推荐用于 Gravitino Flink connector)

必需组件:

  • Gravitino 服务器 v1.2.0 或更高版本(本教程需要 v1.1.0 之后引入的功能;参见 02-setup-guide/README.md
  • Hive Metastore(用于 Hive catalog)
  • Apache Kafka broker(用于 Kafka 源表)

建议版本:

  • 与您的 Flink 版本匹配的 Apache Paimon connector JAR

继续之前,请验证您的 Java 和 Flink 安装:

${JAVA_HOME}/bin/java -version
${FLINK_HOME}/bin/flink --version

分步指南

步骤 1:设置环境变量

以下值在整个教程中使用,请根据您的环境进行调整:

export GRAVITINO_URI="http://localhost:8090"
export METALAKE_NAME="default_metalake"
export HIVE_METASTORE_URI="thrift://localhost:9083"
export PAIMON_WAREHOUSE="file:///tmp/paimon-warehouse"
export KAFKA_BROKERS="localhost:9092"

步骤 2:在 Gravitino 中创建 Hive 和 Paimon Catalog

使用 Gravitino REST API 创建 Hive catalog 和 Paimon catalog。
如果需要传递 Hive 特定的配置(例如 hive-conf-dir),请在 catalog 属性中使用 flink.bypass. 前缀(例如 flink.bypass.hive-conf-dir),这些配置会被转发给 Flink Hive connector。

# 创建 Hive catalog
curl -X POST -H "Accept: application/vnd.gravitino.v1+json" \
  -H "Content-Type: application/json" -d '{
    "name": "hive_catalog",
    "type": "relational",
    "comment": "Hive catalog for Flink streaming",
    "provider": "hive",
    "properties": {
      "metastore.uris": "'"$HIVE_METASTORE_URI"'"
    }
  }' ${GRAVITINO_URI}/api/metalakes/${METALAKE_NAME}/catalogs

# 创建 Paimon catalog(文件系统后端)
curl -X POST -H "Accept: application/vnd.gravitino.v1+json" \
  -H "Content-Type: application/json" -d '{
    "name": "paimon_catalog",
    "type": "relational",
    "comment": "Paimon catalog for Flink streaming",
    "provider": "lakehouse-paimon",
    "properties": {
      "catalog-backend": "filesystem",
      "warehouse": "'"$PAIMON_WAREHOUSE"'"
    }
  }' ${GRAVITINO_URI}/api/metalakes/${METALAKE_NAME}/catalogs

步骤 3:在 Flink 中安装所需的 JAR

将以下 JAR 放入 FLINK_HOME/lib 目录,以便 Flink SQL 能够加载它们:

  • gravitino-flink-connector-runtime-1.18_2.12-<version>.jar
  • paimon-flink-1.18-<version>.jar
  • flink-sql-connector-kafka-<version>.jar
  • Flink HiveCatalog 所需的 Hive 依赖(与 Flink-Hive 集成相同)

提示:Kafka SQL connector 不包含在 Flink 二进制发行版中,需要单独添加。

步骤 4:配置 Flink 使用 Gravitino catalog store

编辑 FLINK_HOME/conf/flink-conf.yaml,添加以下配置(请替换为您的实际值):

table.catalog-store.kind: gravitino
table.catalog-store.gravitino.gravitino.metalake: ${METALAKE_NAME}
table.catalog-store.gravitino.gravitino.uri: ${GRAVITINO_URI}

如果 Flink 正在运行,请重启它,然后确保 Flink 集群可访问:

${FLINK_HOME}/bin/start-cluster.sh
curl -sS http://localhost:8081/overview

如果 curl 返回连接被拒绝,步骤 7 中的 INSERT INTO ... SELECT ... 将会失败,因为 SQL Client 无法向集群提交作业。

步骤 5:在 Hive Catalog 中创建 Kafka Generic Table

Flink 的 HiveCatalog 支持 Hive 兼容表和 generic table。在 HiveCatalog 中,除非您显式设置 'connector' = 'hive' 或使用 Hive dialect,否则 table 默认为 generic table。这里我们创建一个 Kafka generic table,元数据存储在 Hive Metastore 中,而数据由 Flink 从 Kafka 读取。如果您需要 Hive 兼容表,请使用 Hive dialect 或设置 'connector' = 'hive'

启动 Flink SQL 客户端:

${FLINK_HOME}/bin/sql-client.sh

在 SQL 客户端中执行以下语句:

-- 使用 Gravitino 管理的 Hive catalog
USE CATALOG hive_catalog;

CREATE DATABASE IF NOT EXISTS streaming_db;
USE streaming_db;

-- Kafka 源表,作为 generic table 存储在 Hive catalog 中
CREATE TABLE kafka_events (
  user_id BIGINT,
  item_id BIGINT,
  behavior STRING,
  ts TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = '${KAFKA_BROKERS}', -- 替换为您的 Kafka brokers
  'properties.group.id' = 'gravitino-flink-demo',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json',
  'json.ignore-parse-errors' = 'true'
);

关于 generic table 的说明:

  • HiveCatalog 支持 Hive 兼容表和 generic table。Hive 兼容表以 Hive 兼容的方式存储,可以从 Hive 中查询。
  • Generic table 是 Flink 特有的。Hive 可以在 Hive Metastore 中看到元数据,但通常无法解析它,因此从 Hive 查询是未定义行为。
  • 如果您需要使用默认 dialect 的 Hive 兼容表,请设置 'connector' = 'hive'。如果使用 Hive dialect,则不需要 connector 属性。
  • 在 Gravitino 中,generic table 的 schema 和分区键存储在 Hive Metastore 的 flink.* 属性中。如果 connector=hive,则该 table 被视为具有原生 Hive schema 的 Hive 兼容表。

步骤 6:创建 Paimon Sink Table

USE CATALOG paimon_catalog;

CREATE DATABASE IF NOT EXISTS streaming_db;
USE streaming_db;

CREATE TABLE paimon_user_behavior (
  user_id BIGINT,
  item_id BIGINT,
  behavior STRING,
  ts TIMESTAMP_LTZ(3)
);

步骤 7:将数据从 Kafka 流式写入 Paimon

SET 'execution.checkpointing.interval' = '10 s';

INSERT INTO paimon_catalog.streaming_db.paimon_user_behavior
SELECT user_id, item_id, behavior, ts
FROM hive_catalog.streaming_db.kafka_events;

如果 Kafka 正在接收 user_behavior 主题的数据,Flink 将持续将其写入 Paimon table。
流式写入 Paimon 需要定期 checkpoint 来提交数据。

代码示例

Kafka 示例消息(JSON 格式):

{"user_id": 1, "item_id": 1001, "behavior": "click"}
{"user_id": 2, "item_id": 1002, "behavior": "buy"}

故障排除

  • Catalog 在 Flink 中不可见:检查 flink-conf.yaml 中的 table.catalog-store.* 配置,并确认 Gravitino 服务器可访问。
  • ClassNotFoundException:确保 Gravitino connector、Kafka connector 和 Paimon JAR 已放入 FLINK_HOME/lib 目录。
  • java.net.ConnectException: Connection refused(执行 INSERT INTO 时):Flink SQL 客户端无法访问 JobManager REST 端点(默认 localhost:8081)。使用 ${FLINK_HOME}/bin/start-cluster.sh 启动集群,并通过 curl http://localhost:8081/overview 验证。
  • 作业处于 RUNNING 状态但 Paimon 中没有新数据:确保在流式模式下启用了 checkpoint(例如 SET 'execution.checkpointing.interval' = '10 s';),并在 Flink Web UI 或 /jobs/<job-id>/checkpoints 中检查 checkpoint 进度。
  • 作业处于 RUNNING 状态但重新运行后预期记录被跳过:Kafka 偏移量由 properties.group.id 跟踪。如果需要重新消费,请使用新的 group id(例如 gravitino-flink-demo-v2)。
  • Table 未找到:使用完全限定名称,如 hive_catalog.streaming_db.kafka_eventspaimon_catalog.streaming_db.paimon_user_behavior

恭喜

您已成功完成 Gravitino Flink 流式处理教程!

您现在拥有一个功能完整的 Flink 流式处理环境,集成了 Gravitino,包括:

  • 配置好的 Gravitino Flink connector,用于统一的 catalog 访问
  • 在 Gravitino 中注册的 Hive 和 Paimon catalog,可从 Flink SQL 访问
  • 一个可工作的流式管道,从 Kafka 读取数据并写入 Paimon
  • 理解 HiveCatalog 中 generic table 与 Hive 兼容表的区别

您的 Flink 环境现在已准备好利用 Gravitino 在流式数据生态系统中进行统一的元数据管理。

延伸阅读

有关更高级的配置和详细文档:

下一步


Apache Gravitino 正在快速发展,本文基于最新版本 1.1.0 编写。如果您遇到问题,请参考官方文档或在 GitHub 上提交 issue。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐