SpringBoot+MyBatis流式查询,处理大规模数据,提高系统的性能和响应能力
SpringBoot+MyBatis流式查询,处理大规模数据,提高系统的性能和响应能力
推荐大家关注一个公众号

推荐文章:
1、springBoot对接kafka,批量、并发、异步获取消息,并动态、批量插入库表;
2、SpringBoot用线程池ThreadPoolTaskExecutor异步处理百万级数据;
3、SpringBoot使用@Async实现多线程异步。
一、简介

在大量数据的时代,处理海量数据成为了许多应用的核心需求。而在Spring Boot与MyBatis的结合中,实现流式的查询可以有效地处理大规模的数据,并提高系统的性能和响应能力。若没有流式查询,想要从数据库取1000 万条数据而又没有足够的内存时,分页查询效率又取决于表设计,若设计不好,就无法执行高效的分页查询。因此流式查询是一个数据库访问框架必须具备的功能。
1.1、什么是mybatis的流式查询?
流式查询是一种分批次逐行读取数据的方式,是一个迭代器,可以通过遍历迭代器来取出结果集,而不是一次性将所有数据加载到内存中。通过流式查询,我们可以遍历迭代器来取出结果集,避免一次性取出大量的数据而占用太多的内存,提高系统的性能和可扩展性。
1.2、Cursor介绍
org.apache.ibatis.cursor.Cursor接口有三个抽象方法,分别是
1、isOpen() :判断cursor是否正处于打开状态;
2、isConsumed() :判断查询结果是否全部读取完;
3、getCurrentIndex() :查询已读取数据在全部数据里的索引位置;
public interface Cursor<T> extends Closeable, Iterable<T> {//判断cursor是否正处于打开状态//当返回true,则表示cursor已经开始从数据库里刷新数据了;boolean isOpen();//判断查询结果是否全部读取完;//当返回true,则表示查询sql匹配的全部数据都消费完了;boolean isConsumed();//查询已读取数据在全部数据里的索引位置;//第一条数据的索引位置为0;当返回索引位置为-1时,则表示已经没有数据可以读取;int getCurrentIndex();}
二、实现流式查询的步骤
mybatis的流式查询,就是服务端程序查询数据的过程中,与远程数据库一直保持连接,不断地去数据库拉取数据,提交事务并关闭sqlsession后,数据库连接断开,停止数据拉取,需要注意的是使用这种方式,需要自己手动维护sqlsession和事务的提交。
2.1、mapper层接口
/*** 功能描述:采用流式查询limit条学生信息* @MethodName: findStudentByScan* @MethodParam: [limit]* @Return: org.apache.ibatis.cursor.Cursor<com.wonders.entity.Student>* @Author: yyalin* @CreateDate: 2022/3/9 13:48*/@Select("select t.s_id AS studentId , t.student_name, t.age, t.phone, t.addr from student t limit #{limit}")@Options(fetchSize = 2)Cursor<Student> findStudentByScan(@Param("limit") long limit);
2.2、service层实现接口
2.2.1、@Transactional注解方式
@Transactionalpublic List<Student> findStudentByScan(long limit) throws Exception {List<Student> students=new ArrayList<>();log.info("入参limit:"+limit);//缺点:注意 Spring 框架当中注解使用的坑:只在外部调用时生效。// 在当前类中调用这个方法,依旧会报错。Cursor<Student> cursor2 =studentMapper.findStudentByScan(limit);//Cursor 实现了迭代器接口,因此在实际使用当中,从 Cursor 取数据非常简单cursor2.forEach(stu -> {log.info("游标的当前索引CurrentIndex:"+cursor2.getCurrentIndex());log.info("用于在取数据之前判断 Cursor 对象是否是打开状态。" +"只有当打开时 Cursor 才能取数据;:"+cursor2.isOpen()); //:truelog.info("用于判断查询结果是否全部取完:"+cursor2.isConsumed()); //falselog.info("输出的信息:"+ JSON.toJSONString(stu));//可以对数据进行处理,主要用于处理大批量的数据students.add(stu);});log.info("返回多少条数据:"+(cursor2.getCurrentIndex()+1)); //返回多少条数据:需要加1才是返回总数log.info("用于在取数据之前判断 Cursor 对象是否是打开状态。" +"只有当打开时 Cursor 才能取数据2;:"+cursor2.isOpen()); //:falselog.info("用于判断查询结果是否全部取完2:"+cursor2.isConsumed()); //trueif(cursor2.isConsumed()){return students;}return students;}
注意:Spring 框架当中注解使用的坑:只在外部调用时生效。在当前类中调用这个方法,依旧会报错。
日志输出:

2.2.2、SqlSessionFactory方式
//方案一:SqlSessionFactory方式public List<Student> findStudentByScan01(long limit) throws Exception {List<Student> students=new ArrayList<>();log.info("入参limit:"+limit);//方案一:SqlSessionFactory方式// 创建sqlSessionFactorySqlSessionFactory sqlSessionFactory = ApplicationContextUtils.getBean("sqlSessionFactory");//建立数据库连接,并保证最后可以关闭SqlSession sqlSession = sqlSessionFactory.openSession();//保证得到的 Cursor 对象是打开状态的Cursor<Student> cursor = sqlSession.getMapper(StudentMapper.class).findStudentByScan(limit);cursor.forEach(stu -> {//对获取到的数据进行处理stu.getAge();});log.info("返回多少条数据:"+(cursor.getCurrentIndex()+1)); //返回多少条数据:需要加1才是返回总数log.info("用于在取数据之前判断 Cursor 对象是否是打开状态。" +"只有当打开时 Cursor 才能取数据;:"+cursor.isOpen()); //:falselog.info("用于判断查询结果是否全部取完:"+cursor.isConsumed()); //trueif(cursor.isConsumed()){// 关闭资源sqlSession.close();return students;}return students;}
日志输出:

2.2.3、ApplicationContextUtils从容器中获取bean
package com.wonders.common.utils;import org.springframework.beans.BeansException;import org.springframework.context.ApplicationContext;import org.springframework.context.ApplicationContextAware;import org.springframework.stereotype.Component;/*** @author yyalin* @desc:此类的用途:使用ApplicationContextUtils工具类 在spring容器中获取beans* @create 2022-1-19* @Version: V1.0*/@Componentpublic class ApplicationContextUtils implements ApplicationContextAware {private static ApplicationContext applicationContext;/*** 功能描述:实现ApplicationContextAware接口的context注入函数,* 将其存入静态变量applicationContext中* @MethodName: setApplicationContext* @MethodParam: [ctx]* @Author: yyalin* @CreateDate: 2022-1-19 13:54*/@Overridepublic void setApplicationContext(ApplicationContext ctx) throws BeansException {applicationContext = ctx;}/*** 功能描述:取得存储在静态变量中的ApplicationContext* @MethodName: getApplicationContext* @Return: org.springframework.context.ApplicationContext* @Author: yyalin* @CreateDate: 2022-1-19 16:38*/public static ApplicationContext getApplicationContext() {return applicationContext;}/*** 功能描述:从静态变量ApplicationContext中取得Bean, 自动转型为所赋值对象的类型* @MethodName: getBean* @MethodParam: [clazz]* @Author: yyalin* @CreateDate: 2022-1-19 17:13*/public static <T> T getBean(Class<T> clazz) {return applicationContext.getBean(clazz);}/*** 功能描述:从静态变量ApplicationContext中取得Bean, 自动转型为所赋值对象的类型* @MethodName: getBean* @MethodParam: [name]* @Author: yyalin* @CreateDate: 2022-1-19 16:45*/@SuppressWarnings("unchecked")public static <T> T getBean(String name) {return (T) applicationContext.getBean(name);}}
三、流式查询的优势
-
减少内存消耗:流式查询不会一次性加载所有数据到内存中,而是逐行读取数据,减少了内存的占用,特别适用于大数据量的场景。
-
提高系统性能:流式查询可以避免一次性加载大量数据导致的性能问题,通过分批次处理数据,提高系统的响应能力。
-
支持大数据处理:对于需要处理大规模数据的应用,流式查询可以有效地处理海量数据,提供高效的数据处理能力。
四、注意事项
mybatis的流式查询的本意,是避免大量数据的查询而导致内存溢出,因此dao层查询返回的是一个迭代器(Cursor),可以每次从迭代器中取出一条查询结果,在实际业务开发过程中,即是根据实际的jvm内存大小,从迭代器中取出一定数量的数据后,再进行数据处理,待处理完之后,继续取出一定数据再处理,以此类推直到全部数据处理完,这样做的最大好处就是能够降低内存使用和垃圾回收器的负担,使数据处理的过程相对更加高效、可控,内存溢出的风险较小;
好处很明显,缺点也很就明显,处理的时间可能会变长,需要引入多线程异步操作,并且在迭代器遍历和数据处理的过程中,数据库连接不能断开,即当前sqlSession要保持持续打开状态,一量断开,数据读取就会中断,所以关于这块的处理,使用mybatis原生的sqlSession进行手动查询、提交事务、回滚和关闭sqlSession最为稳妥、最简单。
更多详细资料,请关注个人微信公众号或搜索“程序猿小杨”添加。
参考:
https://mp.weixin.qq.com/s/bpcdaGR4rPOroMUnv7PtgA
更多推荐
所有评论(0)