数据集成框架SeaTunnel-Transform体验——LLM和Embedding
SeaTunnel是一个高性能的开源数据集成平台,支持多数据源实时同步。本文介绍了通过Docker方式安装SeaTunnel 2.3.12版本,并演示了使用Transform-LLM功能处理MySQL数据的案例。配置示例展示了如何从MySQL读取文章链接,通过自定义LLM模型(OpenAI)分析链接内容并输出摘要到控制台。运行命令只需挂载配置文件即可执行数据处理任务。该方案适用于需要AI辅助分析数
1. 简介

如SeaTunnel官网所述,SeaTunnel是一个高性能的数据集成平台,还支持大数据源多数据源的数据实时同步。
SeaTunnel is a very easy-to-use, multimodal, ultra-high-performance, distributed data integration platform that supports real-time synchronization of massive data.
当然,它也是开源的,地址为:
https://github.com/apache/seatunnel
2. 安装(docker方式)
SeaTunnel官方提供了以下3种安装方式:
- Locally(本地安装)
- Docker容器安装
- K8s(helm/kubernetes均支持)
这3种安装方式均支持SeaTunnel的集群(Cluster)和本地(Local)方式部署。
在本文中为了演示方便,直接使用Docker容器,且使用LocalMode方式运行。
具体命令参考:https://seatunnel.apache.org/docs/2.3.12/start-v2/docker/#set-up-with-docker-in-local-mode
我这里用到的命令如下:
docker pull apache/seatunnel:2.3.12
TIPS:如果由于网络原因拉取镜像失败,可尝试通过配置docker的网络代理解决。
配置方式为:
sudo mkdir -p /etc/systemd/system/docker.service.d
sudo vi /etc/systemd/system/docker.service.d/proxy.conf
#写入对应的配置项
[Service]
Environment="HTTP_PROXY=socks5://192.168.32.155:7078"
Environment="HTTPS_PROXY=socks5://192.168.32.155:7078"
Environment="NO_PROXY=localhost,127.0.0.1,::1"
#重启docker服务
sudo systemctl daemon-reload
sudo systemctl restart docker
#验证
sudo systemctl show --property=Environment docker
#输出示例
root@ubuntu:~# sudo systemctl show --property=Environment docker
Environment=HTTP_PROXY=socks5://192.168.32.155:7078 HTTPS_PROXY=socks5://192.168.32.155:7078 NO_PROXY=localhost,127.0.0.1,::1
运行示例:
docker run --rm -it apache/seatunnel:2.3.12 ./bin/seatunnel.sh -m local -c config/v2.batch.config.template
示例输出:
其中的主要配置脚本为:
{
"env" : {
"parallelism" : 2,
"job.mode" : "BATCH",
"checkpoint.interval" : 10000
},
"source" : [
{
"parallelism" : 2,
"plugin_output" : "fake",
"row.num" : 16,
"schema" : {
"fields" : {
"name" : "string",
"age" : "int"
}
},
"plugin_name" : "FakeSource"
}
],
"sink" : [
{
"plugin_name" : "Console"
}
]
}
作用是将FakeSource的数据输出到Console控制台中。
另外,上面示例中运行的SeaTunnel使用的示例引擎为SeaTunnel Engine,是SeaTunnel中的默认引擎,缩写Zeta(不是阿里的那个分布式事务seata)。
除此之外,它还支持Flink和Spark引擎,本文中不再详细展开。
3. SeaTunnel-Transform
SeaTunnel对于数据的处理主要分为以下3个部分:
Source -> Transform -> Sink

其中的Transform则为对数据的处理部分。关于SeaTunnel-Transform的详细介绍可移步:https://seatunnel.apache.org/docs/2.3.12/transform-v2
随着这几年AI相关的技术的火热,在SeaTunnel的最近迭代中,SeaTunnel在Transform部分也对AI相关的技术栈进行了支持。如借助LLM和Embedding对数据进行处理。
下面就以LLM和Embedding为例,看下在SeaTunnel中是如何使用它们的。
4. SeaTunnel-Transform-LLM使用示例
假设以下场景:
存在一个mysql的数据表,根据表格中的链接,分析出此链接的具体内容是什么,并进行一个总结,并将总结的内容输出到控制台中。
表格内容如下:
| id | link |
|---|---|
| 1 | https://blog.csdn.net/puhaiyang/article/details/154215171 |
| 2 | https://blog.csdn.net/puhaiyang/article/details/150501152 |
| 3 | https://blog.csdn.net/puhaiyang/article/details/149770381 |
| n | …… |
整体流程为:
[mysql-table] -> [SeaTunnel-Trasform-LLM] -> [console]
这里以OpenAi的LLM为例,演示一下实现过程。
编写一个SeaTunnel配置脚本,名为mysql_llm_2_console.conf,内容如下:
env {
parallelism = 1
job.mode = "BATCH"
read_limit.rows_per_second = 10
}
source {
Jdbc {
url = "jdbc:mysql://192.168.31.174:3306/testdb?serverTimezone=GMT%2b8&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 100
username = "root"
password = "123456"
query = "select link from articles"
}
}
transform {
LLM {
model_provider = CUSTOM
model = bot-202512_YOUR_BOT_ID
api_path = "https://ark.cn-beijing.volces.com/api/v3/bots/chat/completions"
custom_config={
custom_response_parse = "$.choices[*].message.content"
custom_request_headers = {
Content-Type = "application/json; charset=UTF-8"
Authorization = "Bearer YOUR_API_KEY"
}
custom_request_body ={
model = "${model}"
messages = [
{
role = "system"
content = "你是一个链接搜索助手,可以对所提供的链接内容进行搜索并总结"
},
{
role = "user"
content = "${input}"
}]
}
}
}
}
sink {
console {
}
}
编写好了之后,运行一下SeaTunnel。命令为:
docker run --rm -it -v /tmp/job/:/config apache/seatunnel:2.3.12 ./bin/seatunnel.sh -m local -c /config/mysql_llm_2_console.conf
mysql_llm_2_console.conf在物理机中的路径为:/tmp/job/mysql_llm_2_console.conf
运行后的截图如下:

左边是SeaTunnel容器的控制台输出,右边是火山方舟管理控制台的trace日志。
从运行结果上可以看出,SeaTunnel确实成功调用到了LLM,也向SeaTunnel返回了链接对应的总结信息。
但这里有个小问题:在原生的docker容器中如果LLM返回的内容有中文会乱码,暂时没找到在容器器运行的解决办法,这里只关注到能调用到LLM就行。
关于SeaTunnel-Transform的llm详细用法,可参考:https://seatunnel.apache.org/docs/2.3.12/transform-v2/llm
5. SeaTunnel-Transform-Embedding使用示例
5.1 需求描述
假设有以下词组:
香蕉、苹果、葡萄、凤梨、蓝莓、杜蕾斯、冈本、第六感、杰士邦、赤尾、梦龙、哈根达斯、八喜、巧乐兹、可爱多、海底捞、乡村基、肯德基、西贝、大米先生。
输入一个要检索的词组,从以上词组中返回最相似的3个词。
5.2 流程实现
创建 /tmp/job/sql_2_milvus.conf 配置文件,编写SeaTunnel配置内容:
env {
job.mode = "BATCH"
}
source {
FakeSource {
row.num = 20
schema = {
fields {
id = "int"
name = "string"
}
}
rows = [
{fields = [0, "香蕉"], kind = INSERT}
{fields = [1, "苹果"], kind = INSERT}
{fields = [2, "葡萄"], kind = INSERT}
{fields = [3, "凤梨"], kind = INSERT}
{fields = [4, "蓝莓"], kind = INSERT}
{fields = [5, "杜蕾斯"], kind = INSERT}
{fields = [6, "冈本"], kind = INSERT}
{fields = [7, "第六感"], kind = INSERT}
{fields = [8, "杰士邦"], kind = INSERT}
{fields = [9, "赤尾"], kind = INSERT}
{fields = [10, "梦龙"], kind = INSERT}
{fields = [11, "哈根达斯"], kind = INSERT}
{fields = [12, "八喜"], kind = INSERT}
{fields = [13, "巧乐兹"], kind = INSERT}
{fields = [14, "可爱多"], kind = INSERT}
{fields = [15, "海底捞"], kind = INSERT}
{fields = [16, "乡村基"], kind = INSERT}
{fields = [17, "肯德基"], kind = INSERT}
{fields = [18, "西贝"], kind = INSERT}
{fields = [19, "大米先生"], kind = INSERT}
]
plugin_output = "fake"
}
}
transform {
Embedding {
plugin_input = "fake"
model_provider = Zhipu
model = embedding-3
api_key = your_zhipu_api
api_path = "https://open.bigmodel.cn/api/paas/v4/embeddings"
vectorization_fields {
name_vector = name
}
plugin_output = "embedding_output"
}
}
sink {
Milvus {
# Milvus 连接信息
url = "http://192.168.32.244:19530"
token = "root:Milvus"
database = milvus_db_test
}
}
我这里用到的Embedding模型为智谱的向量模型,对应的apiKey自行从它的控制台生成。
编写好了之后,运行SeaTunnel:
docker run --rm -it -v /tmp/job/:/config apache/seatunnel:2.3.12 ./bin/seatunnel.sh -m local -c /config/sql_2_milvus.conf

TIPS:如果在运行过程中报错,并怀疑可能是http请求的参数不正确导致的,可通过在容器内开启config/log4j2_client.properties的debug日志进行分析
从日志的输出中可以看到,成功写入了20行数据。(milvus向量数据库的安装方式,可参考:install_standalone-docker)
安装好milvus之后,需要提前创建好对应的库和collection,代码如下:
from pymilvus import MilvusClient, DataType
client = MilvusClient(
uri="http://192.168.32.244:19530",
token="root:Milvus"
)
client.using_database(
db_name="milvus_db_test"
)
# Create schema
schema = MilvusClient.create_schema(
auto_id=False,
enable_dynamic_field=True,
)
# Add fields to schema
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="name", datatype=DataType.VARCHAR, max_length=512)
schema.add_field(field_name="name_vector", datatype=DataType.FLOAT_VECTOR, dim=2048)
# 3.3. Prepare index parameters
index_params = client.prepare_index_params()
# 3.4. Add indexes
index_params.add_index(
field_name="id",
index_type="AUTOINDEX"
)
index_params.add_index(
field_name="name_vector",
index_type="AUTOINDEX",
metric_type="COSINE" # 度量类型:余弦相似度
)
client.create_collection(
collection_name="fake",
schema=schema,
index_params=index_params
)
res = client.get_load_state(
collection_name="fake"
)
print(res)
访问Milvus的控制台,也可以看到对应的数据统计信息:
5.3 验证
在数据有了后,我们就可以用对SeaTunnel处理后的数据进行检索了。
因为这里使用的向量数据库为Milvus,它也支持类型关系型数据库基于字段的查询,如:
from pymilvus import MilvusClient, DataType
client = MilvusClient(
uri="http://192.168.32.244:19530",
token="root:Milvus"
)
client.using_database(
db_name="milvus_db_test"
)
res = client.get(collection_name="fake",
ids=[0],
output_fields=["id", "name"])
print(res)
执行后,输出的结果为:
data: [“{‘name’: ‘香蕉’, ‘id’: 0}”], extra_info: {}
但这种查询不满足我们的需求:输入一个要检索的词,从原始词组中返回最相似的3个词。
我们需要做的是:需要先对一个词进行向量处理后,再进行最大相似搜索。

将一个词进行向量化表示,这却并不是一个简单的过程。
尽管在Milvus中,它也提供了嵌入函数的功能,通过调用Milvus支持的大模型,也可以让我们实现像操作关系型数据库一样对数据进行检索。其流程为:

但我们上面的场景中,我们使用的是SeaTunnel调用智谱的向量模型实现的向量化表示,所以为了统一,检索前还是需要将先调用智谱的向量模型(embedding-3)将检索内容完成向量化表示之后,再进行检索。
编写搜索代码:
import requests
import json
from pymilvus import MilvusClient, DataType
def get_embedding(token: str, dimensions: int, text: str):
url = "https://open.bigmodel.cn/api/paas/v4/embeddings"
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
data = {
"model": "embedding-3",
"input": text,
"dimensions": dimensions
}
response = requests.post(url, headers=headers, data=json.dumps(data))
response.raise_for_status()
res = response.json()
# 直接返回 embedding 数组
return res["data"][0]["embedding"]
def milvus_search(client: MilvusClient,
collection_name: str,
vector_field: str,
query_vector: list,
limit: int = 3,
metric_type: str = "COSINE"):
"""
在 Milvus 中执行向量搜索的封装函数。
"""
res = client.search(
collection_name=collection_name,
anns_field=vector_field,
data=[query_vector],
limit=limit,
search_params={"metric_type": metric_type}
)
return res
# 示例调用
if __name__ == "__main__":
text = "啪啪啪"
token = "your_zhipu_api_key"
query_vector = get_embedding(token, 2048, text)
# 连接 Milvus
client = MilvusClient(
uri="http://192.168.32.244:19530",
token="root:Milvus"
)
client.using_database("milvus_db_test")
# 执行搜索
res = milvus_search(
client=client,
collection_name="fake",
vector_field="name_vector",
query_vector=query_vector,
limit=3
)
print(f"被搜索的关键字为:{text}")
for hits in res:
for hit in hits:
# 假设我们要获取字段 'name'
data = client.get(
collection_name="fake", # 这里可以改成参数化
ids=[hit.id],
output_fields=["name"]
)
print(f"id: {hit.id}, name: {data[0]["name"]}, distance: {hit.distance}")
下面运行几个数据测试一下:
搜索啪啪啪时,返回了:杜蕾斯、杰士邦、香蕉。截图如下:
搜索小龙坎时,返回了:海底捞、西贝、乡村基。截图如下:
搜索丝袜时,返回了:杜蕾斯、杰士邦、葡萄。截图如下:
搜索情趣时,返回了:杜蕾斯、冈本、第六感。截图如下:
搜索荔枝时,返回了:葡萄、凤梨、香蕉。截图如下:
搜索甜的时,返回了:可爱多、巧乐兹、八喜。截图如下:
体验后,只能说非常强!
向量AI搜索,YYDS。
更多推荐


所有评论(0)