Mysql中运用流式查询避免数据量过深导致OOM-后续

一、前言

之前http://www.jianshu.com/p/0339c6fe8b61
介绍了MySQL中三栽采取流式方法,看起颇优雅,实则优雅的又还是有一些注意事项的,下面就本着流式查询时的注意事项进行介绍。

第二、 同一个接连于游标迭代数据经过被莫克让复用

2.1 简单介绍

先贴下MySQL Connector/J 5.1 Developer Guide中原文:

There are some caveats with this approach. You must read all of the rows
in the result set (or close it) before you can issue any other queries
on the connection, or an exception will be thrown.
也就是说当通过流式查询得到一个ResultSet后,在公通过next迭代出所有因素之前要调用close关闭它前面,你无可知用与一个数据库连接去发起另外一个查询,否者抛来特别(第一差调用的正常化,第二软的抛来深)。

Therefore, if using streaming results, process them as quickly as
possible if you want to maintain concurrent access to the tables
referenced by the statement producing the result set.
苟你想只要维持访问表并发量,那么即便如硬着头皮快之拿流式Resultset内容处理完毕。

于是产生其一界定是以不游标情况下我们于取得resultset后,mysqlclient已经拿数据总体放置了resultset,所以此时该数据库连接就空了,所以可以错过实践外查询,而流式查询时回来给咱Resultset后,所有数据并无还当Resultset,当我们调用next时候要利用数据库连接于Server获取数据,所以在周数据看了前这个连续一直深受占据,所以才生了与一个连连于游标迭代数据经过中无克被复用的注意事项。

2.2 一个事例

public static void testOneConn() throws Exception {

        System.out.print("begn");
        String cmdSql = "select app_key,device_id,brand from test where app_key='111' AND app_version = '2.2.2' and package_name='com.taobao.taobao.test'";

        Connection conn = null;
        PreparedStatement stmt = null;
        ResultSet rs = null;

        ExecutorService executor = Executors.newSingleThreadExecutor();

        try {

            conn = ds.getConnection();

            stmt = conn.prepareStatement(cmdSql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
            stmt.setFetchSize(Integer.MIN_VALUE);
                       //第一次查询(1)
            rs = stmt.executeQuery();
            final ResultSet tempRs = rs;
            Future<Integer> feture = executor.submit(new Callable<Integer>() {
                public Integer call() throws InterruptedException {
                    try {
                        while (tempRs.next()) {
                            try {
                                System.out.println("app_key:" + tempRs.getString(1) + "device_id:" + tempRs.getString(2)
                                        + "brand:" + tempRs.getString(3));
                            } catch (SQLException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                            }

                        }
                    } catch (SQLException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    return null;
                }
            });

            Thread.sleep(2000);

            try {
                //第二次查询(2)
                stmt.executeQuery();

            } catch (Exception e) {
                System.out.println("second search:" + e.getLocalizedMessage());

            }

            // 等待子线程执行完毕
            feture.get();

        } catch (Exception e) {
            System.out.println("first search:" + e.getLocalizedMessage());
        } finally {

            close(stmt, rs, conn);

        }
    }

实施方代码:
app_key:111device_id:222brand:333
second search:Streaming result set
com.mysql.jdbc.RowDataDynamic@3e0c5a62 is still active. No statements
may be issued when any streaming result sets are open and in use on a
given connection. Ensure that you have called .close() on any active
streaming result sets before attempting more queries.
app_key:111device_id:232332brand:45454
app_key:111device_id:54brand:eyu345
……..
能够第二不善询问时丢来了大,说是RowDataDynamic@3e0c5a62
数据集还是激活状态,当一个连上都出一个开拓的流式Resultset时候不能够重复发起一个询问,并且在品尝再度多询问前包调用了close方法。

一旦首先糟糕询问不了事影响延续自己的迭代数据。

那即便来拘禁下在其次软查询前调用close方法会生出甚功效。
每当
查询(2)前加加rs.close();然后实施,结果是首先子线程会不断输出迭代结果,然后主线程调用close,调用close后子线程不在出口结果,然后主线程的close方法也无回去,这是什么情况那么?不急急,看看close里面做了吗:

public void close() throws SQLException {
        ...

    //获取链接的锁
        Object mutex = this;

        MySQLConnection conn = null;

        if (this.owner != null) {
            conn = this.owner.connection;

            if (conn != null) {
                mutex = conn.getConnectionMutex();
            }
        }

        synchronized (mutex) {
            // drain the rest of the records.
            while (next() != null) {
                hadMore = true;
                howMuchMore++;

                if (howMuchMore % 100 == 0) {
                    Thread.yield();
                }
            }

        ...
    }

能够在调用close时候,里面要循环调用next尝试将结余记录迭代出来丢弃丢。我们调用close之所以没赶回,实际上是因中在废除数据被,其实文档中说迭代数据截止或者调用close后才能够调用新的查询,其实调用close作用还是如将Resultset里面的多寡迭代出来了。

那么还有一个题目,上面说又子线程也无出口结果了,为底那?那么我们当回顾下next方法:

public boolean next() throws SQLException {
        synchronized (checkClosed().getConnectionMutex()) {

            ....
        }
    }

protected final MySQLConnection checkClosed() throws SQLException {
    MySQLConnection c = this.connection;

    if (c == null) {
        throw SQLError.createSQLException(
                Messages
                        .getString("ResultSet.Operation_not_allowed_after_ResultSet_closed_144"), //$NON-NLS-1$
                SQLError.SQL_STATE_GENERAL_ERROR, getExceptionInterceptor());
    }

    return c;
}

soga,原来调用next方面里面为是优先拿走链接的沿,但是是锁现在被close方法锁有,你也许说synchronized是只是更称锁哇,为甚调用next进入不了那?
不错synchronized是只是又入锁,但是调用close和调用next是不同线程哦。

其三、MyBatisCursorItemReader是线程不安全之

前文章介绍了采取MyBatisCursorItemReader可以由咱们温馨操作游标,使用时在xml注入即可:

<bean id="myMyBatisCursorItemReader" class="org.mybatis.spring.batch.MyBatisCursorItemReader">
    <property name="sqlSessionFactory" ref="sqlSessionFactory" />
    <property name="queryId"
        value="com.taobao.accs.mass.petadata.dal.sqlmap.AccsDeviceInfoDAOMapper.selectByExampleForPetaData" />
</bean>

当我们一味发生一个线程调用myMyBatisCursorItemReader进行询问操作上,很优雅,没有问题,但是当多只线程都调用myMyBatisCursorItemReader进行open,read操作就来题目了,因为马上卖是线程不安全的。下面看下myMyBatisCursorItemReader代码:

public class MyBatisCursorItemReader<T> extends AbstractItemCountingItemStreamItemReader<T> implements InitializingBean {

  private String queryId;

  private SqlSessionFactory sqlSessionFactory;
  private SqlSession sqlSession;

  private Map<String, Object> parameterValues;

  private Cursor<T> cursor;
  private Iterator<T> cursorIterator;

  ...
  @Override
  protected T doRead() throws Exception {
    T next = null;
    if (cursorIterator.hasNext()) {
      next = cursorIterator.next();
    }
    return next;
  }

  @Override
  protected void doOpen() throws Exception {
    Map<String, Object> parameters = new HashMap<String, Object>();
    if (parameterValues != null) {
      parameters.putAll(parameterValues);
    }

    sqlSession = sqlSessionFactory.openSession(ExecutorType.SIMPLE);
    cursor = sqlSession.selectCursor(queryId, parameters);
    cursorIterator = cursor.iterator();
  }

  @Override
  protected void doClose() throws Exception {
    cursor.close();
    sqlSession.close();
    cursorIterator = null;
  }

  ...
}

嗯,原来下面这些变量都非是线程安全之
private SqlSession sqlSession;
private Map<String, Object> parameterValues;
private Cursor<T> cursor;
private Iterator<T> cursorIterator;

那么我们管他改造也ThreadLocal如何, 其实还是出题目,为底那,看父类:

public abstract class AbstractItemCountingItemStreamItemReader<T> extends AbstractItemStreamItemReader<T> {

    private static final String READ_COUNT = "read.count";

    private static final String READ_COUNT_MAX = "read.count.max";

    private int currentItemCount = 0;

    private int maxItemCount = Integer.MAX_VALUE;

    private boolean saveState = true;


    protected void jumpToItem(int itemIndex) throws Exception {
        for (int i = 0; i < itemIndex; i++) {
            read();
        }
    }

    @Override
    public T read() throws Exception, UnexpectedInputException, ParseException {
        if (currentItemCount >= maxItemCount) {
            return null;
        }
        currentItemCount++;
        T item = doRead();
        if(item instanceof ItemCountAware) {
            ((ItemCountAware) item).setItemCount(currentItemCount);
        }
        return item;
    }

    protected int getCurrentItemCount() {
        return currentItemCount;
    }


    public void setCurrentItemCount(int count) {
        this.currentItemCount = count;
    }


    public void setMaxItemCount(int count) {
        this.maxItemCount = count;
    }

    @Override
    public void close() throws ItemStreamException {
        super.close();
        currentItemCount = 0;
        try {
            doClose();
        }
        catch (Exception e) {
            throw new ItemStreamException("Error while closing item reader", e);
        }
    }

    @Override
    public void open(ExecutionContext executionContext) throws ItemStreamException {
        super.open(executionContext);
        try {
            doOpen();
        }
        catch (Exception e) {
            throw new ItemStreamException("Failed to initialize the reader", e);
        }
        if (!isSaveState()) {
            return;
        }

        if (executionContext.containsKey(getExecutionContextKey(READ_COUNT_MAX))) {
            maxItemCount = executionContext.getInt(getExecutionContextKey(READ_COUNT_MAX));
        }

        int itemCount = 0;
        if (executionContext.containsKey(getExecutionContextKey(READ_COUNT))) {
            itemCount = executionContext.getInt(getExecutionContextKey(READ_COUNT));
        }
        else if(currentItemCount > 0) {
            itemCount = currentItemCount;
        }

        if (itemCount > 0 && itemCount < maxItemCount) {
            try {
                jumpToItem(itemCount);
            }
            catch (Exception e) {
                throw new ItemStreamException("Could not move to stored position on restart", e);
            }
        }

        currentItemCount = itemCount;

    }

    @Override
    public void update(ExecutionContext executionContext) throws ItemStreamException {
        super.update(executionContext);
        if (saveState) {
            Assert.notNull(executionContext, "ExecutionContext must not be null");
            executionContext.putInt(getExecutionContextKey(READ_COUNT), currentItemCount);
            if (maxItemCount < Integer.MAX_VALUE) {
                executionContext.putInt(getExecutionContextKey(READ_COUNT_MAX), maxItemCount);
            }
        }

    }


    public void setSaveState(boolean saveState) {
        this.saveState = saveState;
    }

    /**
     * The flag that determines whether to save internal state for restarts.
     * @return true if the flag was set
     */
    public boolean isSaveState() {
        return saveState;
    }

}

因为内部还出个currentItemCountMySQL是线程不安全之,回头看,会发现此父类对咱从没就此,他的意图是限制迭代出来的笔录数据,如果无欲是界定好不用,所以可以改造为线程安全之:

public class MyBatisCursorItemReaderThreadSafe<T> implements InitializingBean {

    private String queryId;

    private SqlSessionFactory sqlSessionFactory;
    private ThreadLocal<SqlSession> sqlSession = new ThreadLocal<SqlSession>();

    private ThreadLocal<Map<String, Object>> parameterValues = new ThreadLocal<Map<String, Object>>();

    private ThreadLocal<Cursor<T>> cursor = new ThreadLocal<Cursor<T>>();
    private ThreadLocal<Iterator<T>> cursorIterator = new ThreadLocal<Iterator<T>>();

    public MyBatisCursorItemReaderThreadSafe() {
    }

    public T doRead() throws Exception {
        T next = null;
        if (cursorIterator.get().hasNext()) {
            next = cursorIterator.get().next();
        }
        return next;
    }

    public void doOpen() throws Exception {
        Map<String, Object> parameters = new HashMap<String, Object>();
        if (parameterValues != null) {
            parameters.putAll(parameterValues.get());
        }
        SqlSession sqlSessionTmp = null;
        Cursor<T> cursorTemp = null;
        Iterator<T> cursorIteratorTmp = null;
        sqlSessionTmp = sqlSessionFactory.openSession(ExecutorType.SIMPLE);
        cursorTemp = sqlSessionTmp.selectCursor(queryId, parameters);
        cursorIteratorTmp = cursorTemp.iterator();

        // 成功后在设置
        cursor.set(cursorTemp);
        cursorIterator.set(cursorIteratorTmp);
        this.sqlSession.set(sqlSessionTmp);

    }

    public void doClose() throws Exception {

        Cursor<T> cursorTemp = cursor.get();
        if (null != cursorTemp) {
            cursorTemp.close();
            cursor.set(null);
        }
        sqlSession.get().close();

        Iterator<T> cursorIteratorTmp = cursorIterator.get();
        if (null != cursorIteratorTmp) {
            cursorIterator.set(null);

        }

        if (null != parameterValues) {
            parameterValues.set(null);
        }
    }


    public void setParameterValues(Map<String, Object> parameterValues) {
        this.parameterValues.set(parameterValues);
    }

}

自然还有更简约的法门,那即便是下原型模式,每次getBean时候会重复创设一个靶。或者每次用时new一个。

四 、参考

  • https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-reference-implementation-notes.html
  • http://ju.outofmemory.cn/entry/93137
网站地图xml地图