blob: 3fba0b17a5cc7cc1da7b589dd030573132f61536 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.query.h2;
import java.lang.reflect.Field;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.query.QueryCancelledException;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2ValueCacheObject;
import org.apache.ignite.internal.processors.tracing.MTC;
import org.apache.ignite.internal.processors.tracing.MTC.TraceSurroundings;
import org.apache.ignite.internal.processors.tracing.Tracing;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.internal.util.lang.GridIteratorAdapter;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.h2.api.ErrorCode;
import org.h2.engine.Session;
import org.h2.jdbc.JdbcResultSet;
import org.h2.result.ResultInterface;
import org.h2.value.Value;
import static org.apache.ignite.internal.processors.tracing.SpanTags.SQL_PAGE_ROWS;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_ITER_CLOSE;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_PAGE_FETCH;
/**
* Iterator over result set.
*/
public abstract class H2ResultSetIterator<T> extends GridIteratorAdapter<T> implements GridCloseableIterator<T> {
/** */
private static final Field RESULT_FIELD;
/*
* Initialize.
*/
static {
try {
RESULT_FIELD = JdbcResultSet.class.getDeclaredField("result");
RESULT_FIELD.setAccessible(true);
}
catch (NoSuchFieldException e) {
throw new IllegalStateException("Check H2 version in classpath.", e);
}
}
/** */
private static final long serialVersionUID = 0L;
/** */
private ResultInterface res;
/** */
private ResultSet data;
/** */
protected Object[] row;
/** */
private List<Object[]> page;
/** */
private boolean hasRow;
/** Page size. */
private final int pageSize;
/** Column count. */
private final int colCnt;
/** Page row iterator. */
private Iterator<Object[]> rowIter;
/** Current H2 session. */
private final Session ses;
/** Closed. */
private boolean closed;
/** Canceled. */
private boolean canceled;
/** Fetch size interceptor. */
final H2QueryFetchSizeInterceptor fetchSizeInterceptor;
/** Tracing processor. */
protected final Tracing tracing;
/**
* @param data Data array.
* @param log Logger.
* @param h2 Indexing H2.
* @param pageSize Page size.
* @param tracing Tracing processor.
* @throws IgniteCheckedException If failed.
*/
protected H2ResultSetIterator(
ResultSet data,
int pageSize,
IgniteLogger log,
IgniteH2Indexing h2,
H2QueryInfo qryInfo,
Tracing tracing
)
throws IgniteCheckedException {
this.pageSize = pageSize;
this.data = data;
this.tracing = tracing;
try {
res = (ResultInterface)RESULT_FIELD.get(data);
}
catch (IllegalAccessException e) {
throw new IllegalStateException(e); // Must not happen.
}
if (data != null) {
try {
colCnt = data.getMetaData().getColumnCount();
ses = H2Utils.session(data.getStatement().getConnection());
page = new ArrayList<>(pageSize);
}
catch (SQLException e) {
throw new IgniteCheckedException(e);
}
}
else {
colCnt = 0;
page = null;
row = null;
ses = null;
}
assert log != null;
assert h2 != null;
assert qryInfo != null;
fetchSizeInterceptor = new H2QueryFetchSizeInterceptor(h2, qryInfo, log);
}
/**
* @return {@code true} if the next page is available.
* @throws IgniteCheckedException On cancel.
*/
private boolean fetchPage() throws IgniteCheckedException {
lockTables();
try (TraceSurroundings ignored = MTC.support(tracing.create(SQL_PAGE_FETCH, MTC.span()))) {
GridH2Table.checkTablesVersions(ses);
page.clear();
try {
if (data.isClosed())
return false;
}
catch (SQLException e) {
if (e.getErrorCode() == ErrorCode.STATEMENT_WAS_CANCELED)
throw new QueryCancelledException();
throw new IgniteSQLException(e);
}
for (int i = 0; i < pageSize; ++i) {
try {
if (!data.next())
break;
row = new Object[colCnt];
readRow();
page.add(row);
}
catch (SQLException e) {
close();
if (e.getCause() instanceof IgniteSQLException)
throw (IgniteSQLException)e.getCause();
if (e.getErrorCode() == ErrorCode.STATEMENT_WAS_CANCELED)
throw new QueryCancelledException();
throw new IgniteSQLException(e);
}
}
MTC.span().addTag(SQL_PAGE_ROWS, () -> Integer.toString(page.size()));
if (F.isEmpty(page)) {
rowIter = null;
return false;
}
else {
rowIter = page.iterator();
return true;
}
}
finally {
unlockTables();
}
}
/**
* @throws SQLException On error.
*/
private void readRow() throws SQLException {
if (res != null) {
Value[] values = res.currentRow();
for (int c = 0; c < row.length; c++) {
Value val = values[c];
if (val instanceof GridH2ValueCacheObject) {
GridH2ValueCacheObject valCacheObj = (GridH2ValueCacheObject)values[c];
row[c] = valCacheObj.getObject(true);
}
else
row[c] = val.getObject();
}
}
else {
for (int c = 0; c < row.length; c++)
row[c] = data.getObject(c + 1);
}
}
/** */
public void lockTables() {
if (!isClosed() && ses.isLazyQueryExecution())
GridH2Table.readLockTables(ses);
}
/** */
public void unlockTables() {
if (ses.isLazyQueryExecution())
GridH2Table.unlockTables(ses);
}
/**
* @return {@code true} If next row was fetched successfully.
* @throws IgniteCheckedException On error.
*/
private synchronized boolean fetchNext() throws IgniteCheckedException {
if (canceled)
throw new QueryCancelledException();
if (rowIter != null && rowIter.hasNext()) {
row = rowIter.next();
fetchSizeInterceptor.checkOnFetchNext();
return true;
}
if (!fetchPage()) {
closeInternal();
return false;
}
if (rowIter != null && rowIter.hasNext()) {
row = rowIter.next();
fetchSizeInterceptor.checkOnFetchNext();
return true;
}
else
return false;
}
/**
* @return Row.
*/
protected abstract T createRow();
/**
* @throws IgniteCheckedException On error.
*/
public void onClose() throws IgniteCheckedException {
if (data == null)
// Nothing to close.
return;
lockTables();
try {
fetchSizeInterceptor.checkOnClose();
data.close();
}
catch (SQLException e) {
throw new IgniteSQLException(e);
}
finally {
res = null;
data = null;
page = null;
unlockTables();
}
}
/** {@inheritDoc} */
@Override public synchronized void close() throws IgniteCheckedException {
if (closed)
return;
canceled = true;
closeInternal();
}
/**
* @throws IgniteCheckedException On error.
*/
private synchronized void closeInternal() throws IgniteCheckedException {
if (closed)
return;
try (TraceSurroundings ignored = MTC.support(tracing.create(SQL_ITER_CLOSE, MTC.span()))) {
closed = true;
onClose();
}
}
/** {@inheritDoc} */
@Override public boolean isClosed() {
return closed;
}
/** {@inheritDoc} */
@Override public synchronized boolean hasNextX() throws IgniteCheckedException {
if (canceled)
throw new QueryCancelledException();
if (closed)
return false;
return hasRow || (hasRow = fetchNext());
}
/** {@inheritDoc} */
@Override public T nextX() throws IgniteCheckedException {
if (!hasNextX())
throw new NoSuchElementException();
hasRow = false;
return createRow();
}
/** {@inheritDoc} */
@Override public void removeX() throws IgniteCheckedException {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(H2ResultSetIterator.class, this);
}
}