1. 简介

seatunnel

SeaTunnel官网所述,SeaTunnel是一个高性能的数据集成平台,还支持大数据源多数据源的数据实时同步。

SeaTunnel is a very easy-to-use, multimodal, ultra-high-performance, distributed data integration platform that supports real-time synchronization of massive data.

当然,它也是开源的,地址为:
https://github.com/apache/seatunnel

2. 安装(docker方式)

SeaTunnel官方提供了以下3种安装方式:

  1. Locally(本地安装)
  2. Docker容器安装
  3. K8s(helm/kubernetes均支持)

这3种安装方式均支持SeaTunnel的集群(Cluster)和本地(Local)方式部署。

在本文中为了演示方便,直接使用Docker容器,且使用LocalMode方式运行。
具体命令参考:https://seatunnel.apache.org/docs/2.3.12/start-v2/docker/#set-up-with-docker-in-local-mode

我这里用到的命令如下:

docker pull apache/seatunnel:2.3.12

TIPS:如果由于网络原因拉取镜像失败,可尝试通过配置docker的网络代理解决。
配置方式为:

sudo mkdir -p /etc/systemd/system/docker.service.d
sudo vi /etc/systemd/system/docker.service.d/proxy.conf

#写入对应的配置项
[Service]
Environment="HTTP_PROXY=socks5://192.168.32.155:7078"
Environment="HTTPS_PROXY=socks5://192.168.32.155:7078"
Environment="NO_PROXY=localhost,127.0.0.1,::1"

#重启docker服务
sudo systemctl daemon-reload
sudo systemctl restart docker

#验证
sudo systemctl show --property=Environment docker

#输出示例
root@ubuntu:~# sudo systemctl show --property=Environment docker
Environment=HTTP_PROXY=socks5://192.168.32.155:7078 HTTPS_PROXY=socks5://192.168.32.155:7078 NO_PROXY=localhost,127.0.0.1,::1

运行示例:

docker run --rm -it apache/seatunnel:2.3.12 ./bin/seatunnel.sh -m local -c config/v2.batch.config.template

示例输出:
out_console

其中的主要配置脚本为:

{
    "env" : {
        "parallelism" : 2,
        "job.mode" : "BATCH",
        "checkpoint.interval" : 10000
    },
    "source" : [
        {
            "parallelism" : 2,
            "plugin_output" : "fake",
            "row.num" : 16,
            "schema" : {
                "fields" : {
                    "name" : "string",
                    "age" : "int"
                }
            },
            "plugin_name" : "FakeSource"
        }
    ],
    "sink" : [
        {
            "plugin_name" : "Console"
        }
    ]
}

作用是将FakeSource的数据输出到Console控制台中。

另外,上面示例中运行的SeaTunnel使用的示例引擎为SeaTunnel Engine,是SeaTunnel中的默认引擎,缩写Zeta(不是阿里的那个分布式事务seata)。
除此之外,它还支持Flink和Spark引擎,本文中不再详细展开。

3. SeaTunnel-Transform

SeaTunnel对于数据的处理主要分为以下3个部分:

Source -> Transform -> Sink

source-transform-sink

其中的Transform则为对数据的处理部分。关于SeaTunnel-Transform的详细介绍可移步:https://seatunnel.apache.org/docs/2.3.12/transform-v2

随着这几年AI相关的技术的火热,在SeaTunnel的最近迭代中,SeaTunnel在Transform部分也对AI相关的技术栈进行了支持。如借助LLM和Embedding对数据进行处理。

下面就以LLM和Embedding为例,看下在SeaTunnel中是如何使用它们的。

4. SeaTunnel-Transform-LLM使用示例

假设以下场景:

存在一个mysql的数据表,根据表格中的链接,分析出此链接的具体内容是什么,并进行一个总结,并将总结的内容输出到控制台中。

表格内容如下:

id link
1 https://blog.csdn.net/puhaiyang/article/details/154215171
2 https://blog.csdn.net/puhaiyang/article/details/150501152
3 https://blog.csdn.net/puhaiyang/article/details/149770381
n ……

整体流程为:
[mysql-table] -> [SeaTunnel-Trasform-LLM] -> [console]

这里以OpenAi的LLM为例,演示一下实现过程。

编写一个SeaTunnel配置脚本,名为mysql_llm_2_console.conf,内容如下:

env {
  parallelism = 1
  job.mode = "BATCH"
  read_limit.rows_per_second = 10
}

source {
    Jdbc {
        url = "jdbc:mysql://192.168.31.174:3306/testdb?serverTimezone=GMT%2b8&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
        driver = "com.mysql.cj.jdbc.Driver"
        connection_check_timeout_sec = 100
        username = "root"
        password = "123456"
        query = "select link from articles"
    }
}

transform {
  LLM {
	model_provider = CUSTOM
    model = bot-202512_YOUR_BOT_ID
	api_path = "https://ark.cn-beijing.volces.com/api/v3/bots/chat/completions"
    custom_config={
			custom_response_parse = "$.choices[*].message.content"
            custom_request_headers = {
                Content-Type = "application/json; charset=UTF-8"
                Authorization = "Bearer YOUR_API_KEY"            
            }
            custom_request_body ={
                model = "${model}"
                messages = [
                {
                    role = "system"
                    content = "你是一个链接搜索助手,可以对所提供的链接内容进行搜索并总结"
                },
                {
                    role = "user"
                    content = "${input}"
                }]
            }
        }
  }
}

sink {
 console {
 }
}

编写好了之后,运行一下SeaTunnel。命令为:

docker run --rm -it -v /tmp/job/:/config apache/seatunnel:2.3.12 ./bin/seatunnel.sh -m local -c /config/mysql_llm_2_console.conf

mysql_llm_2_console.conf在物理机中的路径为:/tmp/job/mysql_llm_2_console.conf

运行后的截图如下:

result

左边是SeaTunnel容器的控制台输出,右边是火山方舟管理控制台的trace日志。

从运行结果上可以看出,SeaTunnel确实成功调用到了LLM,也向SeaTunnel返回了链接对应的总结信息。

但这里有个小问题:在原生的docker容器中如果LLM返回的内容有中文会乱码,暂时没找到在容器器运行的解决办法,这里只关注到能调用到LLM就行。

关于SeaTunnel-Transform的llm详细用法,可参考:https://seatunnel.apache.org/docs/2.3.12/transform-v2/llm

5. SeaTunnel-Transform-Embedding使用示例

5.1 需求描述

假设有以下词组:
香蕉、苹果、葡萄、凤梨、蓝莓、杜蕾斯、冈本、第六感、杰士邦、赤尾、梦龙、哈根达斯、八喜、巧乐兹、可爱多、海底捞、乡村基、肯德基、西贝、大米先生。

输入一个要检索的词组,从以上词组中返回最相似的3个词。

5.2 流程实现

创建 /tmp/job/sql_2_milvus.conf 配置文件,编写SeaTunnel配置内容:

env {
  job.mode = "BATCH"
}

source {
  FakeSource {
    row.num = 20
    schema = {
      fields {
        id = "int"
        name = "string"
      }
    }

    rows = [
      {fields = [0, "香蕉"], kind = INSERT}
      {fields = [1, "苹果"], kind = INSERT}
      {fields = [2, "葡萄"], kind = INSERT}
      {fields = [3, "凤梨"], kind = INSERT}
      {fields = [4, "蓝莓"], kind = INSERT}
      {fields = [5, "杜蕾斯"], kind = INSERT}
      {fields = [6, "冈本"], kind = INSERT}
      {fields = [7, "第六感"], kind = INSERT}
      {fields = [8, "杰士邦"], kind = INSERT}
      {fields = [9, "赤尾"], kind = INSERT}
      {fields = [10, "梦龙"], kind = INSERT}
      {fields = [11, "哈根达斯"], kind = INSERT}
      {fields = [12, "八喜"], kind = INSERT}
      {fields = [13, "巧乐兹"], kind = INSERT}
      {fields = [14, "可爱多"], kind = INSERT}
      {fields = [15, "海底捞"], kind = INSERT}
      {fields = [16, "乡村基"], kind = INSERT}
      {fields = [17, "肯德基"], kind = INSERT}
      {fields = [18, "西贝"], kind = INSERT}
      {fields = [19, "大米先生"], kind = INSERT}
    ]

    plugin_output = "fake"
  }
}

transform {
  Embedding {
    plugin_input = "fake"
    model_provider = Zhipu
    model = embedding-3
    api_key = your_zhipu_api
    api_path = "https://open.bigmodel.cn/api/paas/v4/embeddings"
    vectorization_fields {
        name_vector = name
    }
    plugin_output = "embedding_output"
  }
}


sink {
  Milvus {
    # Milvus 连接信息
    url = "http://192.168.32.244:19530"
    token = "root:Milvus"
    database = milvus_db_test
  }
}

我这里用到的Embedding模型为智谱的向量模型,对应的apiKey自行从它的控制台生成。

编写好了之后,运行SeaTunnel:

docker run --rm -it -v /tmp/job/:/config apache/seatunnel:2.3.12 ./bin/seatunnel.sh -m local -c /config/sql_2_milvus.conf

job_result

TIPS:如果在运行过程中报错,并怀疑可能是http请求的参数不正确导致的,可通过在容器内开启config/log4j2_client.properties的debug日志进行分析

从日志的输出中可以看到,成功写入了20行数据。(milvus向量数据库的安装方式,可参考:install_standalone-docker

安装好milvus之后,需要提前创建好对应的库和collection,代码如下:

from pymilvus import MilvusClient, DataType

client = MilvusClient(
    uri="http://192.168.32.244:19530",
    token="root:Milvus"
)

client.using_database(
    db_name="milvus_db_test"
)

# Create schema
schema = MilvusClient.create_schema(
    auto_id=False,
    enable_dynamic_field=True,
)

# Add fields to schema
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="name", datatype=DataType.VARCHAR, max_length=512)
schema.add_field(field_name="name_vector", datatype=DataType.FLOAT_VECTOR, dim=2048)

# 3.3. Prepare index parameters
index_params = client.prepare_index_params()

# 3.4. Add indexes
index_params.add_index(
    field_name="id",
    index_type="AUTOINDEX"
)

index_params.add_index(
    field_name="name_vector",
    index_type="AUTOINDEX",
    metric_type="COSINE"  # 度量类型:余弦相似度
)

client.create_collection(
    collection_name="fake",
    schema=schema,
    index_params=index_params
)

res = client.get_load_state(
    collection_name="fake"
)

print(res)

访问Milvus的控制台,也可以看到对应的数据统计信息:
milvus-console

5.3 验证

在数据有了后,我们就可以用对SeaTunnel处理后的数据进行检索了。

因为这里使用的向量数据库为Milvus,它也支持类型关系型数据库基于字段的查询,如:

from pymilvus import MilvusClient, DataType

client = MilvusClient(
    uri="http://192.168.32.244:19530",
    token="root:Milvus"
)

client.using_database(
    db_name="milvus_db_test"
)

res = client.get(collection_name="fake",
                 ids=[0],
                 output_fields=["id", "name"])
print(res)

执行后,输出的结果为:

data: [“{‘name’: ‘香蕉’, ‘id’: 0}”], extra_info: {}

但这种查询不满足我们的需求:输入一个要检索的词,从原始词组中返回最相似的3个词。

我们需要做的是:需要先对一个词进行向量处理后,再进行最大相似搜索。

embedding-search

将一个词进行向量化表示,这却并不是一个简单的过程。

尽管在Milvus中,它也提供了嵌入函数的功能,通过调用Milvus支持的大模型,也可以让我们实现像操作关系型数据库一样对数据进行检索。其流程为:

embedding-query

但我们上面的场景中,我们使用的是SeaTunnel调用智谱的向量模型实现的向量化表示,所以为了统一,检索前还是需要将先调用智谱的向量模型embedding-3)将检索内容完成向量化表示之后,再进行检索。

编写搜索代码:

import requests
import json

from pymilvus import MilvusClient, DataType


def get_embedding(token: str, dimensions: int, text: str):
    url = "https://open.bigmodel.cn/api/paas/v4/embeddings"

    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json"
    }

    data = {
        "model": "embedding-3",
        "input": text,
        "dimensions": dimensions
    }

    response = requests.post(url, headers=headers, data=json.dumps(data))
    response.raise_for_status()

    res = response.json()

    # 直接返回 embedding 数组
    return res["data"][0]["embedding"]


def milvus_search(client: MilvusClient,
                  collection_name: str,
                  vector_field: str,
                  query_vector: list,
                  limit: int = 3,
                  metric_type: str = "COSINE"):
    """
    在 Milvus 中执行向量搜索的封装函数。
    """
    res = client.search(
        collection_name=collection_name,
        anns_field=vector_field,
        data=[query_vector],
        limit=limit,
        search_params={"metric_type": metric_type}
    )

    return res


# 示例调用
if __name__ == "__main__":
    text = "啪啪啪"
    token = "your_zhipu_api_key"

    query_vector = get_embedding(token, 2048, text)

    # 连接 Milvus
    client = MilvusClient(
        uri="http://192.168.32.244:19530",
        token="root:Milvus"
    )
    client.using_database("milvus_db_test")

    # 执行搜索
    res = milvus_search(
        client=client,
        collection_name="fake",
        vector_field="name_vector",
        query_vector=query_vector,
        limit=3
    )

    print(f"被搜索的关键字为:{text}")
    for hits in res:
        for hit in hits:
            # 假设我们要获取字段 'name'
            data = client.get(
                collection_name="fake",  # 这里可以改成参数化
                ids=[hit.id],
                output_fields=["name"]
            )
            print(f"id: {hit.id}, name: {data[0]["name"]}, distance: {hit.distance}")

下面运行几个数据测试一下:

搜索啪啪啪时,返回了:杜蕾斯、杰士邦、香蕉。截图如下:
papapa

搜索小龙坎时,返回了:海底捞、西贝、乡村基。截图如下:
xiaolongkan

搜索丝袜时,返回了:杜蕾斯、杰士邦、葡萄。截图如下:
siwa

搜索情趣时,返回了:杜蕾斯、冈本、第六感。截图如下:
qingqu

搜索荔枝时,返回了:葡萄、凤梨、香蕉。截图如下:
lizhi

搜索甜的时,返回了:可爱多、巧乐兹、八喜。截图如下:
tiande

体验后,只能说非常强!

向量AI搜索,YYDS。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐