blob: b26fca548dcae025c265ac5ca2161c12f88e9e78 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.ignite.internal.jdbc2;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteJdbcDriver;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.query.BulkLoadContextCursor;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
import org.apache.ignite.internal.util.typedef.CAX;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.resources.IgniteInstanceResource;
* Task for SQL queries execution through {@link IgniteJdbcDriver}.
* <p>
* Not closed cursors will be removed after {@link #RMV_DELAY} milliseconds.
* This parameter can be configured via {@link IgniteSystemProperties#IGNITE_JDBC_DRIVER_CURSOR_REMOVE_DELAY}
* system property.
class JdbcQueryTask implements IgniteCallable<JdbcQueryTaskResult> {
/** Serial version uid. */
private static final long serialVersionUID = 0L;
/** How long to store open cursor. */
private static final long RMV_DELAY = IgniteSystemProperties.getLong(
/** Scheduler. */
private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1);
/** Open cursors. */
private static final ConcurrentMap<UUID, Cursor> CURSORS = new ConcurrentHashMap<>();
/** Ignite. */
private Ignite ignite;
/** Uuid. */
private final UUID uuid;
/** Cache name. */
private final String cacheName;
/** Schema name. */
private final String schemaName;
/** Sql. */
private final String sql;
/** Operation type flag - query or not. */
private Boolean isQry;
/** Args. */
private final Object[] args;
/** Fetch size. */
private final int fetchSize;
/** Local execution flag. */
private final boolean loc;
/** Local query flag. */
private final boolean locQry;
/** Collocated query flag. */
private final boolean collocatedQry;
/** Distributed joins flag. */
private final boolean distributedJoins;
* @param ignite Ignite.
* @param cacheName Cache name.
* @param schemaName Schema name.
* @param sql Sql query.
* @param isQry Operation type flag - query or not - to enforce query type check.
* @param loc Local execution flag.
* @param args Args.
* @param fetchSize Fetch size.
* @param uuid UUID.
* @param locQry Local query flag.
* @param collocatedQry Collocated query flag.
* @param distributedJoins Distributed joins flag.
public JdbcQueryTask(Ignite ignite, String cacheName, String schemaName, String sql, Boolean isQry, boolean loc,
Object[] args, int fetchSize, UUID uuid, boolean locQry, boolean collocatedQry, boolean distributedJoins) {
this.ignite = ignite;
this.args = args;
this.uuid = uuid;
this.cacheName = cacheName;
this.schemaName = schemaName;
this.sql = sql;
this.isQry = isQry;
this.fetchSize = fetchSize;
this.loc = loc;
this.locQry = locQry;
this.collocatedQry = collocatedQry;
this.distributedJoins = distributedJoins;
/** {@inheritDoc} */
@Override public JdbcQueryTaskResult call() throws Exception {
Cursor cursor = CURSORS.get(uuid);
List<String> tbls = null;
List<String> cols = null;
List<String> types = null;
boolean first;
if (first = (cursor == null)) {
IgniteCache<?, ?> cache = ignite.cache(cacheName);
// Don't create caches on server nodes in order to avoid of data rebalancing.
boolean start = ignite.configuration().isClientMode();
if (cache == null && cacheName == null)
cache = ((IgniteKernal)ignite).context().cache().getOrStartPublicCache(start);
if (cache == null) {
if (cacheName == null)
throw new SQLException("Failed to execute query. No suitable caches found.");
throw new SQLException("Cache not found [cacheName=" + cacheName + ']');
SqlFieldsQuery qry = (isQry != null ? new SqlFieldsQueryEx(sql, isQry) : new SqlFieldsQuery(sql))
FieldsQueryCursor<List<?>> fldQryCursor = cache.withKeepBinary().query(qry);
if (fldQryCursor instanceof BulkLoadContextCursor) {
throw new SQLException("COPY command is currently supported only in thin JDBC driver.");
QueryCursorImpl<List<?>> qryCursor = (QueryCursorImpl<List<?>>)fldQryCursor;
if (isQry == null)
isQry = qryCursor.isQuery();
CURSORS.put(uuid, cursor = new Cursor(qryCursor, qryCursor.iterator()));
if (first || updateMetadata()) {
Collection<GridQueryFieldMetadata> meta = cursor.queryCursor().fieldsMeta();
tbls = new ArrayList<>(meta.size());
cols = new ArrayList<>(meta.size());
types = new ArrayList<>(meta.size());
for (GridQueryFieldMetadata desc : meta) {
List<List<?>> rows = new ArrayList<>();
for (List<?> row : cursor) {
List<Object> row0 = new ArrayList<>(row.size());
for (Object val : row)
row0.add(val == null || JdbcUtils.isSqlType(val.getClass()) ? val : val.toString());
if (rows.size() == fetchSize) // If fetchSize is 0 then unlimited
boolean finished = !cursor.hasNext();
if (finished)
remove(uuid, cursor);
else if (first) {
if (!loc)
else if (!loc && !CURSORS.replace(uuid, cursor, new Cursor(cursor.cursor, cursor.iter)))
assert !CURSORS.containsKey(uuid) : "Concurrent cursor modification.";
assert isQry != null : "Query flag must be set prior to returning result";
return new JdbcQueryTaskResult(uuid, finished, isQry, rows, cols, tbls, types);
* @return Enforce join order flag (SQL hit).
protected boolean enforceJoinOrder() {
return false;
* @return Lazy query execution flag (SQL hit).
protected boolean lazy() {
return false;
* @return Flag to update metadata on demand.
protected boolean updateMetadata() {
return false;
* @return Flag to update enable server side updates.
protected boolean skipReducerOnUpdate() {
return false;
* Schedules removal of stored cursor in case of remote query execution.
* @param uuid Cursor UUID.
static void scheduleRemoval(final UUID uuid) {
scheduleRemoval(uuid, RMV_DELAY);
* Schedules removal of stored cursor in case of remote query execution.
* @param uuid Cursor UUID.
* @param delay Delay in milliseconds.
private static void scheduleRemoval(final UUID uuid, long delay) {
SCHEDULER.schedule(new CAX() {
@Override public void applyx() {
while (true) {
Cursor c = CURSORS.get(uuid);
if (c == null)
// If the cursor was accessed since last scheduling then reschedule.
long untouchedTime = U.currentTimeMillis() - c.lastAccessTime;
if (untouchedTime < RMV_DELAY) {
scheduleRemoval(uuid, RMV_DELAY - untouchedTime);
else if (remove(uuid, c))
}, delay, TimeUnit.MILLISECONDS);
* @param uuid Cursor UUID.
* @param c Cursor.
* @return {@code true} If succeeded.
private static boolean remove(UUID uuid, Cursor c) {
boolean rmv = CURSORS.remove(uuid, c);
if (rmv)
return rmv;
* @param uuid Cursor UUID.
* @param c Cursor.
static void addCursor(UUID uuid, Cursor c) {
CURSORS.putIfAbsent(uuid, c);
* Closes and removes cursor.
* @param uuid Cursor UUID.
static void remove(UUID uuid) {
Cursor c = CURSORS.remove(uuid);
if (c != null)
* Cursor.
static final class Cursor implements Iterable<List<?>> {
/** Cursor. */
final QueryCursor<List<?>> cursor;
/** Iterator. */
final Iterator<List<?>> iter;
/** Last access time. */
final long lastAccessTime;
* @param cursor Cursor.
* @param iter Iterator.
Cursor(QueryCursor<List<?>> cursor, Iterator<List<?>> iter) {
this.cursor = cursor;
this.iter = iter;
this.lastAccessTime = U.currentTimeMillis();
/** {@inheritDoc} */
@Override public Iterator<List<?>> iterator() {
return iter;
* @return {@code True} if cursor has next element.
public boolean hasNext() {
return iter.hasNext();
* @return Cursor.
public QueryCursorImpl<List<?>> queryCursor() {
return (QueryCursorImpl<List<?>>)cursor;