mysql JDBC的三种查询(普通、流式、游标)

这篇具有很好参考价值的文章主要介绍了mysql JDBC的三种查询(普通、流式、游标)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

使用JDBC向mysql发送查询时,有三种方式:

  • 常规查询:JDBC驱动会阻塞的一次性读取全部查询的数据到 JVM 内存中,或者分页读取
  • 流式查询:每次执行rs.next时会判断数据是否需要从mysql服务器获取,如果需要触发读取一批数据(可能n行)加载到 JVM 内存进行业务处理
  • 游标查询:通过 fetchSize 参数,控制每次从mysql服务器一次读取多少行数据。

1、常规查询

public static void normalQuery() throws SQLException {
    Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3307/test?useSSL=false", "root", "123456");
    PreparedStatement statement = connection.prepareStatement(sql);
    //statement.setFetchSize(100); //不起作用
    ResultSet resultSet = statement.executeQuery();
    
    while(resultSet.next()){
        System.out.println(resultSet.getString(2));
    }
    resultSet.close();
    statement.close();
    connection.close();
}

1)说明:

  1. 第四行设置featchSize不起作用。
  2. 第五行statement.executeQuery()执行查询会阻塞,因为需要等到所有数据返回并放到内存中;接下来每次执行resultSet.next()方法会从内存中获取数据。

2)将jvm内存设置较小(-Xms16m -Xmx16m),对于大数据的查询会产生OOM:

数据库流式查询,java,mysql,jvm,java

为了避免OOM,通常我们会使用分页查询,或者下面的两种方式。

2、流式查询

public static void streamQuery() throws Exception { 
    Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3307/test?useSSL=false", "root", "123456");
    PreparedStatement statement = connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);    
    statement.setFetchSize(Integer.MIN_VALUE); 
    //或者通过 com.mysql.jdbc.StatementImpl
    ((StatementImpl) statement).enableStreamingResults();
    
    ResultSet rs = statement.executeQuery();
    while (rs.next()) {
        System.out.println(rs.getString(2));
    }
    rs.close();
    statement.close();
    connection.close();
}

2.1)流式查询的条件:

随着大数据的到来,对于百万、千万的数据使用流式查询可以有效避免OOM。在执行statement.executeQuery()时不会从TCP响应流中读取完所有数据,当下面执行rs.next()时会按照需要从TCP响应流中读取部分数据。

  1. 创建Statement的时候需要制定ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY
  2. 设置fetchSize位Integer.MIN_VALUE

或者通过com.mysql.jdbc.StatementImpl的enableStreamingResults()方法设置。二者是一致的。看mysql的jdbc(com.mysql.jdbc.StatementImpl)源码:

数据库流式查询,java,mysql,jvm,java

2.2)流式查询原理:

1)基本概念

我们要知道jdbc客户端和mysql服务器之间是通过TCP建立的通信,使用mysql协议进行传输数据。首先声明一个概念:在三次握手建立了TCP连接后,就可以在这个通道上进行通信了,直到关闭该连接。

在 TCP 中发送端和接收端**可以是客户端/服务端,也可以是服务器/客户端**,通信的双方在任意时刻既可以是接收数据也可以是发送数据(全双工)。在通信中,收发双方都不保持记录的边界,所以需要按照一定的协议进行表示。在mysql中会按照mysql协议来进行交互。

有了上面的概念,我们重新来定义这两种查询:

在执行st.executeQuery()时,jdbc驱动会通过connection对象和mysql服务器建立TCP连接,同时在这个链接通道中发送sql命令,并接受返回。二者的区别是:

  1. 普通查询:也叫批量查询,jdbc客户端会阻塞的一次性从TCP通道中读取完mysql服务的返回数据;
  2. 流式查询:分批的从TCP通道中读取mysql服务返回的数据,每次读取的数据量并不是一行(通常是一个package大小),jdbc客户端在调用rs.next()方法时会根据需要从TCP流通道中读取部分数据。(并不是每次读区一行数据,网上说的几乎都是错的!)

2)源码查看:

从statement.executeQuery()方法跟进去,主要的调用连如下:

protected ResultSetInternalMethods executeInternal(int maxRowsToRetrieve, Buffer sendPacket, boolean createStreamingResultSet, boolean queryIsSelectOnly,
            Field[] metadataFromCache, boolean isBatch) throws SQLException {
        synchronized (checkClosed().getConnectionMutex()) {
            MySQLConnection locallyScopedConnection = this.connection;
            rs = locallyScopedConnection.execSQL(this, null, maxRowsToRetrieve, sendPacket, this.resultSetType, this.resultSetConcurrency,
                            createStreamingResultSet, this.currentCatalog, metadataFromCache, isBatch);
            return rs;
        }
public ResultSetInternalMethods execSQL(StatementImpl callingStatement, String sql, int maxRows, Buffer packet, int resultSetType, int resultSetConcurrency,
            boolean streamResults, String catalog, Field[] cachedMetadata, boolean isBatch) throws SQLException {
        synchronized (getConnectionMutex()) {
            return this.io.sqlQueryDirect(callingStatement, null, null, packet, maxRows, resultSetType, resultSetConcurrency, streamResults, catalog,
                        cachedMetadata);
        }
}
final ResultSetInternalMethods sqlQueryDirect(StatementImpl callingStatement, String query, String characterEncoding, Buffer queryPacket, int maxRows,
            int resultSetType, int resultSetConcurrency, boolean streamResults, String catalog, Field[] cachedMetadata) throws Exception {
        Buffer resultPacket = sendCommand(MysqlDefs.QUERY, null, queryPacket, false, null, 0);
        ResultSetInternalMethods rs = readAllResults(callingStatement, maxRows, resultSetType, resultSetConcurrency, streamResults, catalog, resultPacket,
                    false, -1L, cachedMetadata);
        return rs;
}
ResultSetImpl readAllResults(StatementImpl callingStatement, int maxRows, int resultSetType, int resultSetConcurrency, boolean streamResults,
            String catalog, Buffer resultPacket, boolean isBinaryEncoded, long preSentColumnCount, Field[] metadataFromCache) throws SQLException {
        ResultSetImpl topLevelResultSet = readResultsForQueryOrUpdate(callingStatement, maxRows, resultSetType, resultSetConcurrency, streamResults, catalog,
                resultPacket, isBinaryEncoded, preSentColumnCount, metadataFromCache);
        return topLevelResultSet;
}
protected final ResultSetImpl readResultsForQueryOrUpdate(StatementImpl callingStatement, int maxRows, int resultSetType, int resultSetConcurrency,
            boolean streamResults, String catalog, Buffer resultPacket, boolean isBinaryEncoded, long preSentColumnCount, Field[] metadataFromCache) throws SQLException {
            com.mysql.jdbc.ResultSetImpl results = getResultSet(callingStatement, columnCount, maxRows, resultSetType, resultSetConcurrency, streamResults,
                    catalog, isBinaryEncoded, metadataFromCache);
            return results;
        }
}
protected ResultSetImpl getResultSet(StatementImpl callingStatement, long columnCount, int maxRows, int resultSetType, int resultSetConcurrency,
            boolean streamResults, String catalog, boolean isBinaryEncoded, Field[] metadataFromCache) throws SQLException {
        Buffer packet; // The packet from the server
        RowData rowData = null;
        if (!streamResults) {
            rowData = readSingleRowSet(columnCount, maxRows, resultSetConcurrency, isBinaryEncoded, (metadataFromCache == null) ? fields : metadataFromCache);
        } else {
            rowData = new RowDataDynamic(this, (int) columnCount, (metadataFromCache == null) ? fields : metadataFromCache, isBinaryEncoded);
            this.streamingData = rowData;
        }
        ResultSetImpl rs = buildResultSetWithRows(callingStatement, catalog, (metadataFromCache == null) ? fields : metadataFromCache, rowData, resultSetType,
                resultSetConcurrency, isBinaryEncoded);
        return rs;
}

说明:

  1. sqlQueryDirect()方法中的sendCommand会通过io发送sql命令请求到mysql服务器,并获取返回流mysqlOutput
  2. getResultSet()方法会判断是否是流式查询还是批量查询。MySQL驱动会根据不同的参数设置选择对应的ResultSet实现类,分别对应三种查询方式:
  • RowDataStatic 静态结果集,默认的查询方式,普通查询
  • RowDataDynamic 动态结果集,流式查询
  • RowDataCursor 游标结果集,服务器端基于游标查询

看上述代码(41行),对于批量查询:readSingleRowSet方法会循环掉用nextRow方法获取所有数据,然后放到jvm内存的rows中:

数据库流式查询,java,mysql,jvm,java

对于流式查询:直接创建RowDataDynamic对象返回。后面在掉用rs.next()获取数据时会根据需要从mysqlOutput流中读取数据。

2.3)流式查询的坑:

public static void streamQuery2() throws Exception { 
    Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3307/test?useSSL=false", "root", "123456");
    //statement1
    PreparedStatement statement = connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);    
    statement.setFetchSize(Integer.MIN_VALUE); 
    ResultSet rs = statement.executeQuery();
    if (rs.next()) {
        System.out.println(rs.getString(2));
    }
    //statement2
    PreparedStatement statement2 = connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);    
    statement2.setFetchSize(Integer.MIN_VALUE); 
    ResultSet rs2 = statement2.executeQuery();
    if (rs2.next()) {
        System.out.println(rs2.getString(2));
    }
//      rs.close();
//      statement.close();
//      connection.close();
}

执行结果:

test1
java.sql.SQLException: Streaming result set com.mysql.jdbc.RowDataDynamic@45c8e616 is still active. No statements may be issued when any streaming result sets are open and in use on a given connection. Ensure that you have called .close() on any active streaming result sets before attempting more queries.
	at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:869)
	at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:865)
	at com.mysql.jdbc.MysqlIO.checkForOutstandingStreamingData(MysqlIO.java:3217)
	at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2453)
	at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2683)
	at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2482)
	at com.mysql.jdbc.StatementImpl.executeSimpleNonQuery(StatementImpl.java:1465)
	at com.mysql.jdbc.StatementImpl.setupStreamingTimeout(StatementImpl.java:726)
	at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:1939)
	at com.tencent.clue_disp_api.MysqlTest.streamQuery2(MysqlTest.java:79)
	at com.tencent.clue_disp_api.MysqlTest.main(MysqlTest.java:25)

MySQL Connector/J 5.1 Developer Guide中原文:

There are some caveats with this approach. You must read all of the rows in the result set (or close it) before you can issue any other queries on the connection, or an exception will be thrown. 也就是说当通过流式查询获取一个ResultSet后,通过next迭代出所有元素之前或者调用close关闭它之前,不能使用同一个数据库连接去发起另外一个查询,否者抛出异常(第一次调用的正常,第二次的抛出异常)。

2.4)抓包验证:

数据库流式查询,java,mysql,jvm,java

查看3307 > 62169的包可以发现,ack都是1324,证明都是针对当时sql请求的返回数据。

数据库流式查询,java,mysql,jvm,java

3、游标查询

public static void cursorQuery() throws Exception {
    Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3307/test?useSSL=false&useCursorFetch=true", "root", "123456");
    ((JDBC4Connection) connection).setUseCursorFetch(true); //com.mysql.jdbc.JDBC4Connection
    Statement statement = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);    
    statement.setFetchSize(2);    
    ResultSet rs = statement.executeQuery(sql);    
    while (rs.next()) {
        System.out.println(rs.getString(2));
        Thread.sleep(5000);
    }
    
    rs.close();
    statement.close();
    connection.close();
}

1)说明:

  • 在连接参数中需要拼接useCursorFetch=true;
  • 创建Statement时需要设置ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY
  • 设置fetchSize控制每一次获取多少条数据

2)抓包验证:

通过wireshark抓包,可以看到每执行一次rs.next() 就会向mysql服务发送一个请求,同时mysql服务返回两条数据:

数据库流式查询,java,mysql,jvm,java

3)游标查询需要注意的点:

由于MySQL方不知道客户端什么时候将数据消费完,而自身的对应表可能会有DML写入操作,此时MySQL需要建立一个临时空间来存放需要拿走的数据。因此对于当你启用useCursorFetch读取大表的时候会看到MySQL上的几个现象:文章来源地址https://www.toymoban.com/news/detail-717292.html

  1. IOPS飙升 (IOPS (Input/Output Per Second):磁盘每秒的读写次数)
  2. 磁盘空间飙升
  3. 客户端JDBC发起SQL后,长时间等待SQL响应数据,这段时间就是服务端在准备数据
  4. 在数据准备完成后,开始传输数据的阶段,网络响应开始飙升,IOPS由“读写”转变为“读取”。
  5. CPU和内存会有一定比例的上升

到了这里,关于mysql JDBC的三种查询(普通、流式、游标)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • MySQL大数据表处理的三种方案,查询效率嘎嘎高

    场景 当我们业务数据库表中的数据越来越多,如果你也和我遇到了以下类似场景,那让我们一起来解决这个问题 数据的插入,查询时长较长 后续业务需求的扩展 在表中新增字段 影响较大 表中的数据并不是所有的都为有效数据 需求只查询时间区间内的 评估表数据体量 我们可

    2024年02月13日
    浏览(51)
  • mysql查询结果命令行方式导出/输出/写入到文件的三种方法

    直接执行命令: 在目录/tmp/下会产生文件test.xls 遇到的问题: 可能原因:mysql没有向/data/下写的权限 查询都自动写入文件: 跳出mysql命令行

    2024年02月11日
    浏览(43)
  • Linux更改普通用户密码的三种方法

    Linux服务器使用root管理员用户创建完成普通用户之后,为了后续使用该用户能够登录服务器(/etc/passwd中每个用户的shell类型为/bin/bash的可用来登录linux服务器),我们需要为普通用户设置登录密码。 useradd用户创建: https://blog.csdn.net/z19861216/article/details/130613814 Linux下,使用root管

    2024年02月15日
    浏览(43)
  • JDBC连接Oracle的三种URL格式

    使用jdbc连接oracle时url有三种格式 格式一: Oracle JDBC Thin using an SID 这种格式是最简单也是用得最多的。 你的oracle的sid可以通过一下指令获得: 格式二: Oracle JDBC Thin using a ServiceName 注意这里的格式,@后面有//, port后面:换成了/,这种格式是Oracle 推荐的格式,因为对于集群来说,

    2024年02月16日
    浏览(43)
  • linux中赋予普通用户root权限的三种方式

    在成功拿下一个服务器并提权成为root用户以后,为了减少root用户使用时间从而减少被网站管理员发现的概率,就需要新建一个普通用户,并赋予其完全的管理员权限,有以下三种常见的方式 1. 直接修改/etc/passwd文件 /etc/passwd文件里的第三列是UID,这个值为0的话就表示这个用

    2024年01月21日
    浏览(57)
  • 解决数据库查询时间过长导致com.mysql.cj.jdbc.exceptions.CommunicationsException: Communications link failure

    大数据量下数据库查询中断,抛出异常,异常信息见附录1。 使用springboot项目测试分库分表,使用sharding-jdbc插件,2000w数据量查询总数count(*)查询,查询失败,经过排查,排除了sharding-jdbc插件的问题,还原原始的mybatis-plus查询方式依然报错,经过查阅网上的相关文章,尝试修

    2024年02月11日
    浏览(110)
  • ES中的三种查询

    Es有三种查询方式,不知道你平时工作中用到的是哪种呢? 一、from+Size 1、深度分页或者size特别大的时候,会出现deep pagination问题.并且因为Es自身的保护机制(max_result_window是10000),如果查出来的数据量大于10000的就会报错. 2、该查询的实际原理类似于mysql中的limit,比如查询第10001条数

    2023年04月09日
    浏览(38)
  • Mybatis的三种映射关系以及联表查询

    目录 一、概念 二、一对一 1、配置generatorConfig.xml 2、Vo包的编写 3、xml的sql编写 4、编写对应接口及实现类 5、测试 三、一对多 1、Vo包类的编写 2、xml的sql编写 3、编写对应接口及实现类 4、测试 四、多对多 1、Vo类 2、xml的sql配置 3、接口及接口实现类 4、测试 1、MyBatis中表之间

    2024年02月10日
    浏览(48)
  • Mybatis-puls——条件查询的三种格式+条件查询null判定+查询投影

    在mybatis_plus的封装中的WrapperT接口参数就是用于封装查询条件   在测试类中启动如上一个简单的查询,然后控制台运行会输出一大堆无关日志,这里先把这些日志关闭 先新建一个XML配置文件   然后变成如下,这里configuration标签里面什么都没有配置就是取消所有日志文件了

    2024年01月18日
    浏览(37)
  • 数据库批量插入数据的三种方法

    测试环境:SpringBoot项目+MybatisPlus框架+MySQL数据库+Lombok 方法一: for循环插入(单条) (总耗时:n分钟,每次都要获取连接Connection、释放连接和关闭资源等操作,比较耗时,这里就没测了) 方法二: 批量插入saveBatch (4~7秒,这里用到了MybatisPLus的saveBatch批量插入方法,实际

    2024年02月14日
    浏览(51)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包