KingbaseES基础(三):存储过程编写 —— AI 数据清洗逻辑数据库侧实现

——把脏活交给数据库,让 Java 专注智能

大家好,我是那个总在 ETL 脚本里写正则、又在凌晨三点调试数据漂移的老架构。今天不聊模型训练,也不谈特征工程——我们解决一个更底层但关键的问题:

当你每天从 IoT 设备、App 埋点、业务系统涌入上百万条原始日志,是该用 Java 写清洗服务,还是直接在电科金仓 KingbaseES(KES) 里用存储过程处理?

很多人一听“存储过程”,立刻想到“过时”、“难维护”、“违背微服务”。
但现实是:在高吞吐、低延迟、强一致的 AI 数据管道中,把清洗逻辑下沉到数据库,往往是最高效、最可靠的选择

为什么?因为数据不动,计算靠近——这是分布式系统的基本原则。

今天我们就用 KES 的 PL/SQL 兼容能力,手把手写一个 AI 特征清洗存储过程,让它自动过滤无效记录、标准化字段、生成衍生标签。


一、为什么 AI 清洗适合放数据库侧?

先说清楚适用场景。不是所有逻辑都该放存储过程,但以下几类非常适合:

  • 规则明确、高频执行:如过滤测试账号、修正时间戳、补全缺失值;
  • 数据量大、网络成本高:避免把 GB 级原始数据拉到应用层再丢弃 90%;
  • 强事务性要求:清洗 + 写入需原子完成,不能半途失败。

举个真实例子:某省级医保平台用 KES 存储千万级就诊记录,每天需清洗异常诊断码。若用 Java 拉取再处理,网络带宽打满;改用存储过程后,清洗时间从 47 分钟降到 6 分钟,且零数据丢失

而 KES 的优势在于:完整兼容 Oracle PL/SQL 语法详见兼容方案),让你复用现有 DBA 技能,无需学习新语言。


二、实战:编写清洗存储过程

假设我们有一张原始表 raw_events,结构如下:

CREATE TABLE raw_events (
    event_id    VARCHAR(64),
    user_id     VARCHAR(64),
    payload     JSONB,
    created_at  TIMESTAMP WITH TIME ZONE,
    status      VARCHAR(20) DEFAULT 'pending'  -- 'pending', 'cleaned', 'invalid'
);

目标:将有效事件清洗后写入 ai_features.user_behavior,无效的标记为 'invalid'

步骤 1:创建清洗存储过程

CREATE OR REPLACE PROCEDURE clean_ai_events(batch_size INT DEFAULT 10000)
LANGUAGE plpgsql
AS $$
DECLARE
    cleaned_count INT := 0;
    invalid_count INT := 0;
    rec RECORD;
BEGIN
    -- 游标遍历 pending 记录
    FOR rec IN 
        SELECT event_id, user_id, payload, created_at 
        FROM raw_events 
        WHERE status = 'pending' 
        LIMIT batch_size
    LOOP
        BEGIN
            -- 规则 1: user_id 必须非空且非测试账号
            IF rec.user_id IS NULL OR rec.user_id LIKE 'TEST_%' THEN
                UPDATE raw_events SET status = 'invalid' WHERE event_id = rec.event_id;
                invalid_count := invalid_count + 1;
                CONTINUE;
            END IF;

            -- 规则 2: payload 必须包含 action 和 item_id
            IF NOT (rec.payload ? 'action') OR NOT (rec.payload ? 'item_id') THEN
                UPDATE raw_events SET status = 'invalid' WHERE event_id = rec.event_id;
                invalid_count := invalid_count + 1;
                CONTINUE;
            END IF;

            -- 规则 3: 时间不能是未来
            IF rec.created_at > NOW() THEN
                UPDATE raw_events SET status = 'invalid' WHERE event_id = rec.event_id;
                invalid_count := invalid_count + 1;
                CONTINUE;
            END IF;

            -- 有效记录:插入特征表
            INSERT INTO ai_features.user_behavior (
                user_id, event_time, event_type, item_id, raw_payload
            ) VALUES (
                rec.user_id,
                rec.created_at,
                rec.payload->>'action',
                rec.payload->>'item_id',
                rec.payload
            );

            -- 标记为已清洗
            UPDATE raw_events SET status = 'cleaned' WHERE event_id = rec.event_id;
            cleaned_count := cleaned_count + 1;

        EXCEPTION WHEN OTHERS THEN
            -- 捕获解析异常(如 JSON 格式错)
            UPDATE raw_events SET status = 'invalid' WHERE event_id = rec.event_id;
            invalid_count := invalid_count + 1;
        END;
    END LOOP;

    -- 日志输出(可通过 KES 日志查看)
    RAISE NOTICE '本次清洗: 有效 % 条, 无效 % 条', cleaned_count, invalid_count;
END;
$$;

✅ 关键设计:

  • 批处理:每次只处理 batch_size 条,避免长事务锁表;
  • 原子性:每条记录独立判断,失败不影响其他;
  • 状态追踪:通过 status 字段实现幂等重试;
  • 异常安全EXCEPTION 块捕获 JSON 解析等错误。

三、调用与调度:让清洗自动化

手动调用(调试用)

CALL clean_ai_events(5000);

定时调度(生产用)

KES 支持通过 外部调度器(如 Linux cron 或 KOPS 运维平台)定期触发:

# 每 5 分钟清洗一次
*/5 * * * * /opt/Kingbase/ES/V9/bin/ksql -U ai_writer -d ai_db -c "CALL clean_ai_events(10000);"

💡 高级方案:结合 KES 的 异步任务框架(需 V9R2+),可实现队列驱动清洗。


四、与 Java 协同:分层职责清晰

这并不意味着 Java 无事可做。合理的分工应是:

  • 数据库侧:做 确定性、规则型 清洗(如格式校验、字段补全);
  • Java 侧:做 复杂、模型驱动 的处理(如 embedding 生成、异常检测)。

例如:

// Java 调用存储过程(初始化清洗)
try (CallableStatement cs = conn.prepareCall("{call clean_ai_events(?)}")) {
    cs.setInt(1, 10000);
    cs.execute();
}

// 然后从 cleaned 表读取数据,送入 DL4J 模型
List<UserFeature> features = queryCleanedFeatures(conn);
float[][] embeddings = embeddingModel.infer(features);
saveEmbeddingsToKES(conn, embeddings);

这样,数据库负责“干净数据”,Java 负责“智能计算”,各司其职。

确保你的 JDBC 驱动支持 CallableStatement:下载最新版


五、性能与安全提醒

  • 避免全表扫描:确保 raw_events(status) 有索引;
    CREATE INDEX idx_raw_status ON raw_events (status);
    
  • 控制事务大小:不要一次处理 100 万条,分批提交;
  • 权限最小化:清洗用户 ai_writer 只应有 raw_eventsuser_behavior 的写权限;
  • 监控日志:通过 RAISE NOTICE 输出关键指标,接入运维平台。

结语:智能在应用,效率在数据库

AI 的未来不在“把所有逻辑塞进模型”,而在 构建高效、可靠、可演进的数据基础设施

电科金仓的 KES,通过强大的 PL/SQL 兼容能力,让你能把确定性的数据清洗工作,安全、高效地卸载到数据库侧,从而释放 Java 应用的计算资源,专注于真正的智能任务。

下一期,我们会讲:KingbaseES 分区表实战 —— 管理 TB 级 AI 日志数据
敬请期待。

—— 一位相信“好的架构,是让每个组件做它最擅长的事”的架构师

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐