blob: fce046dd752a92bd030501434f43c92238630b0f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.jdbc2;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteJdbcDriver;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.query.BulkLoadContextCursor;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
import org.apache.ignite.internal.util.typedef.CAX;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.resources.IgniteInstanceResource;
/**
* Task for SQL queries execution through {@link IgniteJdbcDriver}.
* <p>
* Not closed cursors will be removed after {@link #RMV_DELAY} milliseconds.
* This parameter can be configured via {@link IgniteSystemProperties#IGNITE_JDBC_DRIVER_CURSOR_REMOVE_DELAY}
* system property.
*/
class JdbcQueryTask implements IgniteCallable<JdbcQueryTaskResult> {
/** Serial version uid. */
private static final long serialVersionUID = 0L;
/** How long to store open cursor. */
private static final long RMV_DELAY = IgniteSystemProperties.getLong(
IgniteSystemProperties.IGNITE_JDBC_DRIVER_CURSOR_REMOVE_DELAY, 600000);
/** Scheduler. */
private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1);
/** Open cursors. */
private static final ConcurrentMap<UUID, Cursor> CURSORS = new ConcurrentHashMap<>();
/** Ignite. */
@IgniteInstanceResource
private Ignite ignite;
/** Uuid. */
private final UUID uuid;
/** Cache name. */
private final String cacheName;
/** Schema name. */
private final String schemaName;
/** Sql. */
private final String sql;
/** Operation type flag - query or not. */
private Boolean isQry;
/** Args. */
private final Object[] args;
/** Fetch size. */
private final int fetchSize;
/** Local execution flag. */
private final boolean loc;
/** Local query flag. */
private final boolean locQry;
/** Collocated query flag. */
private final boolean collocatedQry;
/** Distributed joins flag. */
private final boolean distributedJoins;
/**
* @param ignite Ignite.
* @param cacheName Cache name.
* @param schemaName Schema name.
* @param sql Sql query.
* @param isQry Operation type flag - query or not - to enforce query type check.
* @param loc Local execution flag.
* @param args Args.
* @param fetchSize Fetch size.
* @param uuid UUID.
* @param locQry Local query flag.
* @param collocatedQry Collocated query flag.
* @param distributedJoins Distributed joins flag.
*/
public JdbcQueryTask(Ignite ignite, String cacheName, String schemaName, String sql, Boolean isQry, boolean loc,
Object[] args, int fetchSize, UUID uuid, boolean locQry, boolean collocatedQry, boolean distributedJoins) {
this.ignite = ignite;
this.args = args;
this.uuid = uuid;
this.cacheName = cacheName;
this.schemaName = schemaName;
this.sql = sql;
this.isQry = isQry;
this.fetchSize = fetchSize;
this.loc = loc;
this.locQry = locQry;
this.collocatedQry = collocatedQry;
this.distributedJoins = distributedJoins;
}
/** {@inheritDoc} */
@Override public JdbcQueryTaskResult call() throws Exception {
Cursor cursor = CURSORS.get(uuid);
List<String> tbls = null;
List<String> cols = null;
List<String> types = null;
boolean first;
if (first = (cursor == null)) {
IgniteCache<?, ?> cache = ignite.cache(cacheName);
// Don't create caches on server nodes in order to avoid of data rebalancing.
boolean start = ignite.configuration().isClientMode();
if (cache == null && cacheName == null)
cache = ((IgniteKernal)ignite).context().cache().getOrStartPublicCache(start, !loc && locQry);
if (cache == null) {
if (cacheName == null)
throw new SQLException("Failed to execute query. No suitable caches found.");
else
throw new SQLException("Cache not found [cacheName=" + cacheName + ']');
}
SqlFieldsQuery qry = (isQry != null ? new SqlFieldsQueryEx(sql, isQry) : new SqlFieldsQuery(sql))
.setArgs(args);
qry.setPageSize(fetchSize);
qry.setLocal(locQry);
qry.setCollocated(collocatedQry);
qry.setDistributedJoins(distributedJoins);
qry.setEnforceJoinOrder(enforceJoinOrder());
qry.setLazy(lazy());
qry.setSchema(schemaName);
FieldsQueryCursor<List<?>> fldQryCursor = cache.withKeepBinary().query(qry);
if (fldQryCursor instanceof BulkLoadContextCursor) {
fldQryCursor.close();
throw new SQLException("COPY command is currently supported only in thin JDBC driver.");
}
QueryCursorImpl<List<?>> qryCursor = (QueryCursorImpl<List<?>>)fldQryCursor;
if (isQry == null)
isQry = qryCursor.isQuery();
CURSORS.put(uuid, cursor = new Cursor(qryCursor, qryCursor.iterator()));
}
if (first || updateMetadata()) {
Collection<GridQueryFieldMetadata> meta = cursor.queryCursor().fieldsMeta();
tbls = new ArrayList<>(meta.size());
cols = new ArrayList<>(meta.size());
types = new ArrayList<>(meta.size());
for (GridQueryFieldMetadata desc : meta) {
tbls.add(desc.typeName());
cols.add(desc.fieldName().toUpperCase());
types.add(desc.fieldTypeName());
}
}
List<List<?>> rows = new ArrayList<>();
for (List<?> row : cursor) {
List<Object> row0 = new ArrayList<>(row.size());
for (Object val : row)
row0.add(val == null || JdbcUtils.isSqlType(val.getClass()) ? val : val.toString());
rows.add(row0);
if (rows.size() == fetchSize) // If fetchSize is 0 then unlimited
break;
}
boolean finished = !cursor.hasNext();
if (finished)
remove(uuid, cursor);
else if (first) {
if (!loc)
scheduleRemoval(uuid);
}
else if (!loc && !CURSORS.replace(uuid, cursor, new Cursor(cursor.cursor, cursor.iter)))
assert !CURSORS.containsKey(uuid) : "Concurrent cursor modification.";
assert isQry != null : "Query flag must be set prior to returning result";
return new JdbcQueryTaskResult(uuid, finished, isQry, rows, cols, tbls, types);
}
/**
* @return Enforce join order flag (SQL hit).
*/
protected boolean enforceJoinOrder() {
return false;
}
/**
* @return Lazy query execution flag (SQL hit).
*/
protected boolean lazy() {
return false;
}
/**
* @return Flag to update metadata on demand.
*/
protected boolean updateMetadata() {
return false;
}
/**
* @return Flag to update enable server side updates.
*/
protected boolean skipReducerOnUpdate() {
return false;
}
/**
* Schedules removal of stored cursor in case of remote query execution.
*
* @param uuid Cursor UUID.
*/
static void scheduleRemoval(final UUID uuid) {
scheduleRemoval(uuid, RMV_DELAY);
}
/**
* Schedules removal of stored cursor in case of remote query execution.
*
* @param uuid Cursor UUID.
* @param delay Delay in milliseconds.
*/
private static void scheduleRemoval(final UUID uuid, long delay) {
SCHEDULER.schedule(new CAX() {
@Override public void applyx() {
while (true) {
Cursor c = CURSORS.get(uuid);
if (c == null)
break;
// If the cursor was accessed since last scheduling then reschedule.
long untouchedTime = U.currentTimeMillis() - c.lastAccessTime;
if (untouchedTime < RMV_DELAY) {
scheduleRemoval(uuid, RMV_DELAY - untouchedTime);
break;
}
else if (remove(uuid, c))
break;
}
}
}, delay, TimeUnit.MILLISECONDS);
}
/**
* @param uuid Cursor UUID.
* @param c Cursor.
* @return {@code true} If succeeded.
*/
private static boolean remove(UUID uuid, Cursor c) {
boolean rmv = CURSORS.remove(uuid, c);
if (rmv)
c.cursor.close();
return rmv;
}
/**
* @param uuid Cursor UUID.
* @param c Cursor.
*/
static void addCursor(UUID uuid, Cursor c) {
CURSORS.putIfAbsent(uuid, c);
}
/**
* Closes and removes cursor.
*
* @param uuid Cursor UUID.
*/
static void remove(UUID uuid) {
Cursor c = CURSORS.remove(uuid);
if (c != null)
c.cursor.close();
}
/**
* Cursor.
*/
static final class Cursor implements Iterable<List<?>> {
/** Cursor. */
final QueryCursor<List<?>> cursor;
/** Iterator. */
final Iterator<List<?>> iter;
/** Last access time. */
final long lastAccessTime;
/**
* @param cursor Cursor.
* @param iter Iterator.
*/
Cursor(QueryCursor<List<?>> cursor, Iterator<List<?>> iter) {
this.cursor = cursor;
this.iter = iter;
this.lastAccessTime = U.currentTimeMillis();
}
/** {@inheritDoc} */
@Override public Iterator<List<?>> iterator() {
return iter;
}
/**
* @return {@code True} if cursor has next element.
*/
public boolean hasNext() {
return iter.hasNext();
}
/**
* @return Cursor.
*/
public QueryCursorImpl<List<?>> queryCursor() {
return (QueryCursorImpl<List<?>>)cursor;
}
}
}