一、问题背景

大模型项目中需要使用知识库,业务数据需要向量计算后入库向量库,当批量入库某类业务向量数据时发生了报错信息:

com.alibaba.fastjson.JSONException: syntax error, expect {, actual string, fieldName data, pos 5005, line 1, column 5006{"status":500,"code":500,"success":false,"data":"org.springframework.web.util.NestedServletException: Handler dispatch failed; nested exception is java.lang.OutOfMemoryError: Java heap space\n\tat org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1087)\n\tat org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:965)\n\tat org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006)\n\tat org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:909)\n\tat javax.servlet.http.HttpServlet.service(HttpServlet.java:555)\n\tat

.

.

.

ConnectionHandler.process(AbstractProtocol.java:926)\n\tat org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1791)\n\tat org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:52)\n\tat org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1191)\n\tat org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659)\n\tat org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)\n\tat java.lang.Thread.run(Thread.java:750)\nCaused by: java.lang.OutOfMemoryError: Java heap space\n","msg":"服务器异常"}

二、问题分析

初看日志报错,OutOfMemoryError, 内存溢出了, 于是想到的是调整JVM 参数,将对内存调大,但是调整之后依旧如此。

想着之前在开发环境都已经测试过数据,而且比这个测试环境的数据量更大,并没有出现类似问题,然后就开始了怀疑和排查其他因素。

对比了开发服务器上的微服务JVM 配置也是一样,然后居然开始怀疑是不是测试服务器本身内存就不够的问题,看了一下确实比开发服务器小,但是内存尚有剩余,不至于出现OOM。


断点调试确认是向量数据入库的问题:
 

public <T> InsertResp insertRespWithDatabase(List<T> entities, String collectionName, String database, String embeddingSignage) {
    AtomicLong InsertCnt = new AtomicLong();
    List<Object> primaryKeys = Lists.newArrayList();
    DescribeCollectionResp describeCollection = milvusService.describeCollection(collectionName, database);
    // 获取向量字段名称
    List<String> vectorFieldNames = describeCollection.getVectorFieldNames();
    // 将entities按1000条截取,每一千条执行一次批量插入
    List<List<T>> partition = Lists.partition(entities, 1000);

    partition.forEach(list -> {
        Insert insert = new Insert();
        insert.setCollectionName(collectionName);

        List<JSONObject> jsonObjects = JSONArray.parseArray(JSON.toJSONString(list), JSONObject.class);
        vectorFieldNames.stream().forEach(fieldName -> {
            // 计算向量值
            List<String> fieldValues = jsonObjects.stream().map(o -> o.getString(fieldName)).collect(Collectors.toList());
            List<Float[]> vectors = milvusService.embeddings(collectionName, fieldValues, embeddingSignage);
            IntStream.range(0, vectors.size()).forEach(index -> jsonObjects.get(index).put(fieldName, vectors.get(index)));
        });

        // 只转换当前批次的数据
        List<JsonObject> batchData = jsonObjects.stream().map(e -> convert(e, vectorFieldNames)).collect(Collectors.toList());
        insert.setData(batchData);

        try {
            InsertResp insertResp = milvusService.insert(insert, database);
            if(insertResp.getInsertCnt() < batchData.size()) {
                log.error("插入数据丢失,传入数据数量为{},插入数量为{}", batchData.size(), insertResp.getInsertCnt());
            } else {
                log.info("插入数据成功,插入数量为{}", insertResp.getInsertCnt());
            }
            InsertCnt.addAndGet(insertResp.getInsertCnt());
            primaryKeys.addAll(insertResp.getPrimaryKeys());
        } catch(Exception e) {
            // 方案1: 简化日志记录,避免序列化复杂对象
            log.error("同步失败,批次数据条数:{},异常信息:{}", batchData.size(), e.getMessage(), e);
        }
    });

    return InsertResp.builder().InsertCnt(InsertCnt.get()).primaryKeys(primaryKeys).build();
}

并且将批量的1000 改到300 就不出现了,所以确实是因为占用内存过大了,导致不够用。而且在断点的时候也没有出现,因为断点给了GC 的时间。

将代码丢给AI 分析,说这段代码确实会有内存炸弹

最后建议修改代码,不要用流式处理以及即使释放对象内存:

 public <T> InsertResp insertRespWithDatabase(List<T> entities, String collectionName, String database, String embeddingSignage) {
        AtomicLong InsertCnt = new AtomicLong();
        List<Object> primaryKeys = Lists.newArrayList();
        DescribeCollectionResp describeCollection = milvusService.describeCollection(collectionName, database);
        // 获取向量字段名称
        List<String> vectorFieldNames = describeCollection.getVectorFieldNames();

        AtomicLong totalInsertCnt = new AtomicLong(0);
        List<Object> totalPrimaryKeys = new ArrayList<>(entities.size() / 1000 * 2); // 预分配容量,减少扩容
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        // 3. 分批处理:每批 1000 条(可根据内存调整为 500)
        int batchSize = 1000;
        for (int i = 0; i < entities.size(); i += batchSize) {
            // 限定批次范围,避免越界
            int endIndex = Math.min(i + batchSize, entities.size());
            List<T> batchEntities = entities.subList(i, endIndex);
            List<Object> batchPrimaryKeys = new ArrayList<>(batchSize); // 批次内主键,避免全局累积压力

            try {
                // ========== 核心优化1:减少 JSON 序列化/反序列化 ==========
                List<JSONObject> jsonObjects = new ArrayList<>(batchSize);
                for (T entity : batchEntities) {
                    // 直接转 JSONObject,避免先转 String 再解析(减少临时字符串)
                    JSONObject json = (entity instanceof JSONObject) ? (JSONObject) entity : JSON.parseObject(JSON.toJSONString(entity));
                    jsonObjects.add(json);
                }

                // ========== 核心优化2:向量计算 + 内存复用 ==========
                for (String fieldName : vectorFieldNames) {
                    // 提取字段值(直接遍历,避免 stream 中间对象)
                    List<String> fieldValues = new ArrayList<>(batchSize);
                    for (JSONObject json : jsonObjects) {
                        fieldValues.add(json.getString(fieldName));
                    }

                    // 计算向量(返回 Float[] 改为 float[] 更省内存,若 milvusService 支持)
                    List<Float[]> vectors = milvusService.embeddings(collectionName, fieldValues, embeddingSignage);

                    // 回填向量值(直接遍历,减少 stream 开销)
                    for (int j = 0; j < vectors.size(); j++) {
                        jsonObjects.get(j).put(fieldName, vectors.get(j));
                    }

                    // 释放向量列表内存
                    vectors.clear();
                    fieldValues.clear();
                }

                // ========== 核心优化3:转换插入数据 + 及时释放 ==========
                List<JsonObject> batchData = new ArrayList<>(batchSize);
                for (JSONObject json : jsonObjects) {
                    batchData.add(convert(json, vectorFieldNames));
                }

                // ========== 执行插入 ==========
                Insert insert = new Insert();
                insert.setCollectionName(collectionName);
                insert.setData(batchData);

                InsertResp insertResp = milvusService.insert(insert, database);
                long insertCnt = insertResp.getInsertCnt();
                totalInsertCnt.addAndGet(insertCnt);

                // 收集主键(批次内先存储,避免全局列表频繁扩容)
                batchPrimaryKeys.addAll(insertResp.getPrimaryKeys());
                totalPrimaryKeys.addAll(batchPrimaryKeys);

                // 日志输出(简化日志内容,减少字符串拼接)
                if (insertCnt < batchData.size()) {
                    log.error("批次[{} - {}]插入数据丢失,传入{}条,成功{}条", i, endIndex - 1, batchData.size(), insertCnt);
                } else {
                    log.info("批次[{} - {}]插入成功,共{}条", i, endIndex - 1, insertCnt);
                }

                // ========== 核心优化4:手动释放当前批次内存 ==========
                jsonObjects.clear();
                batchData.clear();
                batchPrimaryKeys.clear();
            } catch (Exception e) {
                log.error("批次[{} - {}]插入失败,异常:{}", i, endIndex - 1, e.getMessage(), e);
                // 可选:抛出异常终止插入,或跳过当前批次继续
                // throw new RuntimeException("批量插入失败", e);
            }
        }
            stopWatch.stop();
            log.info("所有批次插入完成,总数据量{}条,成功插入{}条,耗时{}ms",
                    entities.size(), totalInsertCnt.get(), stopWatch.getTotalTimeMillis());

            // 4. 返回结果(避免创建大对象)
            return InsertResp.builder()
                    .InsertCnt(totalInsertCnt.get())
                    .primaryKeys(totalPrimaryKeys)
                    .build();
    }
 

代码改成如上之后,确实可以一次执行1000 的插入也没有OOM 了,本来想就这么改算了,还有就是将一次处理的数量改小。 

但是心中始终有个不解的疑惑,为啥之前都正常,现在在测试环境就有问题,这点数据量应该不至于耗费几个G 的内存,将微服务的最大堆内存调整到了8G 依旧报错,用原先的代码。

于是自己又对准了原先的开发环境调试,也出现问题了,疑惑更加来了。

在几次本地和服务器的调试中,偶然发现了只要用了本地的另外一个微服务就都正常了。仔细意向,原来这个向量计算是通过SDK 接口调用了另外的微服务,最终发现这个OOM 是由向量计算的微服务跑出来的,其xmx 配置为 512M ,

真相终于浮出水面。

就这么个问题又是让自己查了大半天,root cause 有时候还是要费点心思, 不然心中的疑云不会消散。

另外就是写java 代码还是要注意一下内存的使用,不用真以为GC 是万能的。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐