MySQLMysql中动用流式查询避免数据量过大导致OOM-后续

一、前言

之前http://www.jianshu.com/p/0339c6fe8b61
介绍了MySQL中两种采纳流式方法,看起来很优雅,实则优雅的同时依然有部分注意事项的,下面就针对流式查询时候的注意事项进行介绍。

二、 同一个连接在游标迭代数据经过中无法被复用

2.1 简单介绍

先贴下MySQL Connector/J 5.1 Developer Guide中原文:

MySQL,There are some caveats with this approach. You must read all of the rows
in the result set (or close it) before you can issue any other queries
on the connection, or an exception will be thrown.
也就是说当通过流式查询得到一个ResultSet后,在您通过next迭代出装有因素以前如故调用close关闭它此前,你不可能运用同一个数据库连接去发起其余一个查询,否者抛出特别(第五回调用的例行,第二次的抛出卓殊)。

Therefore, if using streaming results, process them as quickly as
possible if you want to maintain concurrent access to the tables
referenced by the statement producing the result set.
若果你想要保持访问表并发量,那么就要尽量快的把流式Resultset内容处理完毕。

就此有其一限制是因为非游标意况下我们在获取resultset后,mysqlclient已经把数据总体平放了resultset,所以此时该数据库连接就没事了,所以可以去履行其余查询,而流式查询时候回来给大家Resultset后,所有数据并不都在Resultset,当我们调用next时候需要运用数据库连接从Server获取数据,所以在整个数据访问停止在此以前这些连续一直被霸占,所以才有了同一个连续在游标迭代数据经过中不能被复用的注意事项。

2.2 一个事例

public static void testOneConn() throws Exception {

        System.out.print("begn");
        String cmdSql = "select app_key,device_id,brand from test where app_key='111' AND app_version = '2.2.2' and package_name='com.taobao.taobao.test'";

        Connection conn = null;
        PreparedStatement stmt = null;
        ResultSet rs = null;

        ExecutorService executor = Executors.newSingleThreadExecutor();

        try {

            conn = ds.getConnection();

            stmt = conn.prepareStatement(cmdSql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
            stmt.setFetchSize(Integer.MIN_VALUE);
                       //第一次查询(1)
            rs = stmt.executeQuery();
            final ResultSet tempRs = rs;
            Future<Integer> feture = executor.submit(new Callable<Integer>() {
                public Integer call() throws InterruptedException {
                    try {
                        while (tempRs.next()) {
                            try {
                                System.out.println("app_key:" + tempRs.getString(1) + "device_id:" + tempRs.getString(2)
                                        + "brand:" + tempRs.getString(3));
                            } catch (SQLException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                            }

                        }
                    } catch (SQLException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    return null;
                }
            });

            Thread.sleep(2000);

            try {
                //第二次查询(2)
                stmt.executeQuery();

            } catch (Exception e) {
                System.out.println("second search:" + e.getLocalizedMessage());

            }

            // 等待子线程执行完毕
            feture.get();

        } catch (Exception e) {
            System.out.println("first search:" + e.getLocalizedMessage());
        } finally {

            close(stmt, rs, conn);

        }
    }

推行下边代码:
app_key:111device_id:222brand:333
second search:Streaming result set
com.mysql.jdbc.RowDataDynamic@3e0c5a62 is still active. No statements
may be issued when any streaming result sets are open and in use on a
given connection. Ensure that you have called .close() on any active
streaming result sets before attempting more queries.
app_key:111device_id:232332brand:45454
app_key:111device_id:54brand:eyu345
……..
可知第二次询问时候抛出了这一个,说是RowDataDynamic@3e0c5a62
数据集仍旧激活状态,当一个总是上早已有一个开辟的流式Resultset时候不可能再发起一个查询,并且在品味更多询问前确保调用了close方法。

而首先次询问不收影响延续协调的迭代数据。

这就是说就来看下在第二次查询前调用close方法会有甚功能。

查询(2)前添加rs.close();然后实施,结果是率先子线程会不断输出迭代结果,然后主线程调用close,调用close后子线程不在输出结果,然后主线程的close方法也没回来,这是吗情状这?不急,看看close里面做了吗:

public void close() throws SQLException {
        ...

    //获取链接的锁
        Object mutex = this;

        MySQLConnection conn = null;

        if (this.owner != null) {
            conn = this.owner.connection;

            if (conn != null) {
                mutex = conn.getConnectionMutex();
            }
        }

        synchronized (mutex) {
            // drain the rest of the records.
            while (next() != null) {
                hadMore = true;
                howMuchMore++;

                if (howMuchMore % 100 == 0) {
                    Thread.yield();
                }
            }

        ...
    }

可知在调用close时候,里面或者循环调用next尝试把剩下记录迭代出来丢弃掉。大家调用close之所以没回来,实际上是因为内部在放任数据中,其实文档里面说迭代数据截至或者调用close后才能调用新的查询,其实调用close成效依然要把Resultset里面的数据迭代出来完。

那么还有一个问题,上边说还要子线程也不出口结果了,为何那?那么我们在记念下next方法:

public boolean next() throws SQLException {
        synchronized (checkClosed().getConnectionMutex()) {

            ....
        }
    }

protected final MySQLConnection checkClosed() throws SQLException {
    MySQLConnection c = this.connection;

    if (c == null) {
        throw SQLError.createSQLException(
                Messages
                        .getString("ResultSet.Operation_not_allowed_after_ResultSet_closed_144"), //$NON-NLS-1$
                SQLError.SQL_STATE_GENERAL_ERROR, getExceptionInterceptor());
    }

    return c;
}

soga,原来调用next方面里面也是先得到链接的锁,然则这么些锁现在被close方法锁具备,你可能说synchronized是可重入锁哇,为何调用next进入持续这?
不错synchronized是可重入锁,可是调用close和调用next是不同线程哦。

三、MyBatisCursorItem里德(Reade)r是线程不安全的

在此以前作品介绍了拔取MyBatisCursorItem里德r可以由大家自己操作游标,使用时候在xml注入即可:

<bean id="myMyBatisCursorItemReader" class="org.mybatis.spring.batch.MyBatisCursorItemReader">
    <property name="sqlSessionFactory" ref="sqlSessionFactory" />
    <property name="queryId"
        value="com.taobao.accs.mass.petadata.dal.sqlmap.AccsDeviceInfoDAOMapper.selectByExampleForPetaData" />
</bean>

当大家只有一个线程调用myMyBatisCursorItem里德r举办询问操作时候,很优雅,没有问题,不过当六个线程都调用myMyBatisCursorItemReader举办open,read操作就有问题了,因为这货是线程不安全的。上边看下myMyBatisCursorItemReader代码:

public class MyBatisCursorItemReader<T> extends AbstractItemCountingItemStreamItemReader<T> implements InitializingBean {

  private String queryId;

  private SqlSessionFactory sqlSessionFactory;
  private SqlSession sqlSession;

  private Map<String, Object> parameterValues;

  private Cursor<T> cursor;
  private Iterator<T> cursorIterator;

  ...
  @Override
  protected T doRead() throws Exception {
    T next = null;
    if (cursorIterator.hasNext()) {
      next = cursorIterator.next();
    }
    return next;
  }

  @Override
  protected void doOpen() throws Exception {
    Map<String, Object> parameters = new HashMap<String, Object>();
    if (parameterValues != null) {
      parameters.putAll(parameterValues);
    }

    sqlSession = sqlSessionFactory.openSession(ExecutorType.SIMPLE);
    cursor = sqlSession.selectCursor(queryId, parameters);
    cursorIterator = cursor.iterator();
  }

  @Override
  protected void doClose() throws Exception {
    cursor.close();
    sqlSession.close();
    cursorIterator = null;
  }

  ...
}

啊,原来上面这一个变量都不是线程安全的
private SqlSession sqlSession;
private Map<String, Object> parameterValues;
private Cursor<T> cursor;
private Iterator<T> cursorIterator;

那么大家把她改造为ThreadLocal如何, 其实依旧有问题,为什么这,看父类:

public abstract class AbstractItemCountingItemStreamItemReader<T> extends AbstractItemStreamItemReader<T> {

    private static final String READ_COUNT = "read.count";

    private static final String READ_COUNT_MAX = "read.count.max";

    private int currentItemCount = 0;

    private int maxItemCount = Integer.MAX_VALUE;

    private boolean saveState = true;


    protected void jumpToItem(int itemIndex) throws Exception {
        for (int i = 0; i < itemIndex; i++) {
            read();
        }
    }

    @Override
    public T read() throws Exception, UnexpectedInputException, ParseException {
        if (currentItemCount >= maxItemCount) {
            return null;
        }
        currentItemCount++;
        T item = doRead();
        if(item instanceof ItemCountAware) {
            ((ItemCountAware) item).setItemCount(currentItemCount);
        }
        return item;
    }

    protected int getCurrentItemCount() {
        return currentItemCount;
    }


    public void setCurrentItemCount(int count) {
        this.currentItemCount = count;
    }


    public void setMaxItemCount(int count) {
        this.maxItemCount = count;
    }

    @Override
    public void close() throws ItemStreamException {
        super.close();
        currentItemCount = 0;
        try {
            doClose();
        }
        catch (Exception e) {
            throw new ItemStreamException("Error while closing item reader", e);
        }
    }

    @Override
    public void open(ExecutionContext executionContext) throws ItemStreamException {
        super.open(executionContext);
        try {
            doOpen();
        }
        catch (Exception e) {
            throw new ItemStreamException("Failed to initialize the reader", e);
        }
        if (!isSaveState()) {
            return;
        }

        if (executionContext.containsKey(getExecutionContextKey(READ_COUNT_MAX))) {
            maxItemCount = executionContext.getInt(getExecutionContextKey(READ_COUNT_MAX));
        }

        int itemCount = 0;
        if (executionContext.containsKey(getExecutionContextKey(READ_COUNT))) {
            itemCount = executionContext.getInt(getExecutionContextKey(READ_COUNT));
        }
        else if(currentItemCount > 0) {
            itemCount = currentItemCount;
        }

        if (itemCount > 0 && itemCount < maxItemCount) {
            try {
                jumpToItem(itemCount);
            }
            catch (Exception e) {
                throw new ItemStreamException("Could not move to stored position on restart", e);
            }
        }

        currentItemCount = itemCount;

    }

    @Override
    public void update(ExecutionContext executionContext) throws ItemStreamException {
        super.update(executionContext);
        if (saveState) {
            Assert.notNull(executionContext, "ExecutionContext must not be null");
            executionContext.putInt(getExecutionContextKey(READ_COUNT), currentItemCount);
            if (maxItemCount < Integer.MAX_VALUE) {
                executionContext.putInt(getExecutionContextKey(READ_COUNT_MAX), maxItemCount);
            }
        }

    }


    public void setSaveState(boolean saveState) {
        this.saveState = saveState;
    }

    /**
     * The flag that determines whether to save internal state for restarts.
     * @return true if the flag was set
     */
    public boolean isSaveState() {
        return saveState;
    }

}

因为中间还有个currentItemCount是线程不安全的,回头看,会发现这多少个父类对大家尚无用,他的功效是限制迭代出来的记录数据,倘诺不需要以此范围可以不用,所以可以改造为线程安全的:

public class MyBatisCursorItemReaderThreadSafe<T> implements InitializingBean {

    private String queryId;

    private SqlSessionFactory sqlSessionFactory;
    private ThreadLocal<SqlSession> sqlSession = new ThreadLocal<SqlSession>();

    private ThreadLocal<Map<String, Object>> parameterValues = new ThreadLocal<Map<String, Object>>();

    private ThreadLocal<Cursor<T>> cursor = new ThreadLocal<Cursor<T>>();
    private ThreadLocal<Iterator<T>> cursorIterator = new ThreadLocal<Iterator<T>>();

    public MyBatisCursorItemReaderThreadSafe() {
    }

    public T doRead() throws Exception {
        T next = null;
        if (cursorIterator.get().hasNext()) {
            next = cursorIterator.get().next();
        }
        return next;
    }

    public void doOpen() throws Exception {
        Map<String, Object> parameters = new HashMap<String, Object>();
        if (parameterValues != null) {
            parameters.putAll(parameterValues.get());
        }
        SqlSession sqlSessionTmp = null;
        Cursor<T> cursorTemp = null;
        Iterator<T> cursorIteratorTmp = null;
        sqlSessionTmp = sqlSessionFactory.openSession(ExecutorType.SIMPLE);
        cursorTemp = sqlSessionTmp.selectCursor(queryId, parameters);
        cursorIteratorTmp = cursorTemp.iterator();

        // 成功后在设置
        cursor.set(cursorTemp);
        cursorIterator.set(cursorIteratorTmp);
        this.sqlSession.set(sqlSessionTmp);

    }

    public void doClose() throws Exception {

        Cursor<T> cursorTemp = cursor.get();
        if (null != cursorTemp) {
            cursorTemp.close();
            cursor.set(null);
        }
        sqlSession.get().close();

        Iterator<T> cursorIteratorTmp = cursorIterator.get();
        if (null != cursorIteratorTmp) {
            cursorIterator.set(null);

        }

        if (null != parameterValues) {
            parameterValues.set(null);
        }
    }


    public void setParameterValues(Map<String, Object> parameterValues) {
        this.parameterValues.set(parameterValues);
    }

}

自然还有更简单的主意,这就是运用原型情势,每一遍getBean时候会再次成立一个目的。或者每一趟使用时候new一个。

四 、参考

网站地图xml地图