blob: 05ca3aa4f9c80bb9f207adbadb1c95d1542b6a27 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.rest.handlers.query;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.query.Query;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.SqlQuery;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
import org.apache.ignite.internal.processors.rest.GridRestCommand;
import org.apache.ignite.internal.processors.rest.GridRestResponse;
import org.apache.ignite.internal.processors.rest.handlers.GridRestCommandHandlerAdapter;
import org.apache.ignite.internal.processors.rest.request.GridRestRequest;
import org.apache.ignite.internal.processors.rest.request.RestQueryRequest;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import static org.apache.ignite.internal.processors.rest.GridRestCommand.CLOSE_SQL_QUERY;
import static org.apache.ignite.internal.processors.rest.GridRestCommand.EXECUTE_SCAN_QUERY;
import static org.apache.ignite.internal.processors.rest.GridRestCommand.EXECUTE_SQL_FIELDS_QUERY;
import static org.apache.ignite.internal.processors.rest.GridRestCommand.EXECUTE_SQL_QUERY;
import static org.apache.ignite.internal.processors.rest.GridRestCommand.FETCH_SQL_QUERY;
/**
* Query command handler.
*/
public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
/** Supported commands. */
private static final Collection<GridRestCommand> SUPPORTED_COMMANDS = U.sealList(EXECUTE_SQL_QUERY,
EXECUTE_SQL_FIELDS_QUERY,
EXECUTE_SCAN_QUERY,
FETCH_SQL_QUERY,
CLOSE_SQL_QUERY);
/** Query ID sequence. */
private static final AtomicLong qryIdGen = new AtomicLong();
/** Current queries cursors. */
private final ConcurrentHashMap<Long, QueryCursorIterator> qryCurs = new ConcurrentHashMap<>();
/**
* @param ctx Context.
*/
public QueryCommandHandler(GridKernalContext ctx) {
super(ctx);
final long idleQryCurTimeout = ctx.config().getConnectorConfiguration().getIdleQueryCursorTimeout();
long idleQryCurCheckFreq = ctx.config().getConnectorConfiguration().getIdleQueryCursorCheckFrequency();
ctx.timeout().schedule(new Runnable() {
@Override public void run() {
long time = U.currentTimeMillis();
for (Map.Entry<Long, QueryCursorIterator> e : qryCurs.entrySet()) {
QueryCursorIterator qryCurIt = e.getValue();
long createTime = qryCurIt.timestamp();
if (time > createTime + idleQryCurTimeout && qryCurIt.tryLock()) {
try {
qryCurIt.timestamp(-1);
qryCurs.remove(e.getKey(), qryCurIt);
qryCurIt.close();
}
finally {
qryCurIt.unlock();
}
}
}
}
}, idleQryCurCheckFreq, idleQryCurCheckFreq);
}
/**
* @param cur Current cursor.
* @param req Sql request.
* @param qryId Query id.
* @param qryCurs Query cursors.
* @return Query result with items.
*/
private static CacheQueryResult createQueryResult(
Iterator cur, RestQueryRequest req, Long qryId, ConcurrentHashMap<Long, QueryCursorIterator> qryCurs) {
CacheQueryResult res = new CacheQueryResult();
List<Object> items = new ArrayList<>();
for (int i = 0; i < req.pageSize() && cur.hasNext(); ++i)
items.add(cur.next());
res.setItems(items);
res.setLast(!cur.hasNext());
res.setQueryId(qryId);
if (!cur.hasNext())
removeQueryCursor(qryId, qryCurs);
return res;
}
/**
* Removes query cursor.
*
* @param qryId Query id.
* @param qryCurs Query cursors.
*/
private static void removeQueryCursor(Long qryId, ConcurrentHashMap<Long, QueryCursorIterator> qryCurs) {
QueryCursorIterator qryCurIt = qryCurs.get(qryId);
if (qryCurIt == null)
return;
qryCurIt.lock();
try {
if (qryCurIt.timestamp() == -1)
return;
qryCurIt.close();
qryCurs.remove(qryId);
}
finally {
qryCurIt.unlock();
}
}
/**
* Creates class instance.
*
* @param cls Target class.
* @param clsName Implementing class name.
* @return Class instance.
* @throws IgniteException If failed.
*/
private static <T> T instance(Class<? extends T> cls, String clsName) throws IgniteException {
try {
Class<?> implCls = Class.forName(clsName);
if (!cls.isAssignableFrom(implCls))
throw new IgniteException("Failed to create instance (target class does not extend or implement " +
"required class or interface) [cls=" + cls.getName() + ", clsName=" + clsName + ']');
Constructor<?> ctor = implCls.getConstructor();
return (T)ctor.newInstance();
}
catch (ClassNotFoundException e) {
throw new IgniteException("Failed to find target class: " + clsName, e);
}
catch (NoSuchMethodException e) {
throw new IgniteException("Failed to find constructor for provided arguments " +
"[clsName=" + clsName + ']', e);
}
catch (InstantiationException e) {
throw new IgniteException("Failed to instantiate target class " +
"[clsName=" + clsName + ']', e);
}
catch (IllegalAccessException e) {
throw new IgniteException("Failed to instantiate class (constructor is not available) " +
"[clsName=" + clsName + ']', e);
}
catch (InvocationTargetException e) {
throw new IgniteException("Failed to instantiate class (constructor threw an exception) " +
"[clsName=" + clsName + ']', e.getCause());
}
}
/** {@inheritDoc} */
@Override public Collection<GridRestCommand> supportedCommands() {
return SUPPORTED_COMMANDS;
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<GridRestResponse> handleAsync(GridRestRequest req) {
assert req != null;
assert SUPPORTED_COMMANDS.contains(req.command());
assert req instanceof RestQueryRequest : "Invalid type of query request.";
if (req.command() != CLOSE_SQL_QUERY) {
Integer pageSize = ((RestQueryRequest)req).pageSize();
if (pageSize == null)
return new GridFinishedFuture<>(
new IgniteCheckedException(GridRestCommandHandlerAdapter.missingParameter("pageSize"))
);
}
switch (req.command()) {
case EXECUTE_SQL_QUERY:
case EXECUTE_SQL_FIELDS_QUERY:
case EXECUTE_SCAN_QUERY: {
return ctx.closure().callLocalSafe(
new ExecuteQueryCallable(ctx, (RestQueryRequest)req, qryCurs), false);
}
case FETCH_SQL_QUERY: {
return ctx.closure().callLocalSafe(
new FetchQueryCallable((RestQueryRequest)req, qryCurs), false);
}
case CLOSE_SQL_QUERY: {
return ctx.closure().callLocalSafe(
new CloseQueryCallable((RestQueryRequest)req, qryCurs), false);
}
}
return new GridFinishedFuture<>();
}
/**
* Execute query callable.
*/
private static class ExecuteQueryCallable implements Callable<GridRestResponse> {
/** Kernal context. */
private GridKernalContext ctx;
/** Execute query request. */
private RestQueryRequest req;
/** Current queries cursors. */
private final ConcurrentHashMap<Long, QueryCursorIterator> qryCurs;
/**
* @param ctx Kernal context.
* @param req Execute query request.
* @param qryCurs Query cursors.
*/
public ExecuteQueryCallable(GridKernalContext ctx, RestQueryRequest req,
ConcurrentHashMap<Long, QueryCursorIterator> qryCurs) {
this.ctx = ctx;
this.req = req;
this.qryCurs = qryCurs;
}
/** {@inheritDoc} */
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
@Override public GridRestResponse call() throws Exception {
final long qryId = qryIdGen.getAndIncrement();
try {
Query qry;
switch (req.queryType()) {
case SQL:
qry = new SqlQuery(req.typeName(), req.sqlQuery());
((SqlQuery)qry).setArgs(req.arguments());
((SqlQuery)qry).setDistributedJoins(req.distributedJoins());
break;
case SQL_FIELDS:
qry = new SqlFieldsQuery(req.sqlQuery());
((SqlFieldsQuery)qry).setArgs(req.arguments());
((SqlFieldsQuery)qry).setDistributedJoins(req.distributedJoins());
break;
case SCAN:
IgniteBiPredicate pred = null;
if (req.className() != null)
pred = instance(IgniteBiPredicate.class, req.className());
qry = new ScanQuery(pred);
break;
default:
throw new IgniteException("Incorrect query type [type=" + req.queryType() + "]");
}
String cacheName = req.cacheName() == null ? DFLT_CACHE_NAME : req.cacheName();
IgniteCache<Object, Object> cache = ctx.grid().cache(cacheName);
if (cache == null)
return new GridRestResponse(GridRestResponse.STATUS_FAILED,
"Failed to find cache with name: " + cacheName);
final QueryCursor qryCur = cache.query(qry);
Iterator cur = qryCur.iterator();
QueryCursorIterator qryCurIt = new QueryCursorIterator(qryCur, cur);
qryCurIt.lock();
try {
qryCurs.put(qryId, qryCurIt);
CacheQueryResult res = createQueryResult(cur, req, qryId, qryCurs);
switch (req.queryType()) {
case SQL:
case SQL_FIELDS:
List<GridQueryFieldMetadata> fieldsMeta = ((QueryCursorImpl)qryCur).fieldsMeta();
res.setFieldsMetadata(convertMetadata(fieldsMeta));
break;
case SCAN:
CacheQueryFieldsMetaResult keyField = new CacheQueryFieldsMetaResult();
keyField.setFieldName("key");
CacheQueryFieldsMetaResult valField = new CacheQueryFieldsMetaResult();
valField.setFieldName("value");
res.setFieldsMetadata(U.sealList(keyField, valField));
break;
}
return new GridRestResponse(res);
}
finally {
qryCurIt.unlock();
}
}
catch (Exception e) {
removeQueryCursor(qryId, qryCurs);
SQLException sqlErr = X.cause(e, SQLException.class);
return new GridRestResponse(GridRestResponse.STATUS_FAILED,
sqlErr != null ? sqlErr.getMessage() : e.getMessage());
}
}
/**
* @param meta Internal query field metadata.
* @return Rest query field metadata.
*/
private Collection<CacheQueryFieldsMetaResult> convertMetadata(Collection<GridQueryFieldMetadata> meta) {
List<CacheQueryFieldsMetaResult> res = new ArrayList<>();
if (meta != null) {
for (GridQueryFieldMetadata info : meta)
res.add(new CacheQueryFieldsMetaResult(info));
}
return res;
}
}
/**
* Close query callable.
*/
private static class CloseQueryCallable implements Callable<GridRestResponse> {
/** Current queries cursors. */
private final ConcurrentHashMap<Long, QueryCursorIterator> qryCurs;
/** Execute query request. */
private RestQueryRequest req;
/**
* @param req Execute query request.
* @param qryCurs Query cursors.
*/
public CloseQueryCallable(RestQueryRequest req, ConcurrentHashMap<Long, QueryCursorIterator> qryCurs) {
this.req = req;
this.qryCurs = qryCurs;
}
/** {@inheritDoc} */
@Override public GridRestResponse call() throws Exception {
try {
QueryCursorIterator qryCurIt = qryCurs.get(req.queryId());
if (qryCurIt == null)
return new GridRestResponse(true);
qryCurIt.lock();
try {
if (qryCurIt.timestamp() == -1)
return new GridRestResponse(true);
qryCurIt.close();
qryCurs.remove(req.queryId());
}
finally {
qryCurIt.unlock();
}
return new GridRestResponse(true);
}
catch (Exception e) {
removeQueryCursor(req.queryId(), qryCurs);
return new GridRestResponse(GridRestResponse.STATUS_FAILED, e.getMessage());
}
}
}
/**
* Fetch query callable.
*/
private static class FetchQueryCallable implements Callable<GridRestResponse> {
/** Current queries cursors. */
private final ConcurrentHashMap<Long, QueryCursorIterator> qryCurs;
/** Execute query request. */
private RestQueryRequest req;
/**
* @param req Execute query request.
* @param qryCurs Query cursors.
*/
public FetchQueryCallable(RestQueryRequest req, ConcurrentHashMap<Long, QueryCursorIterator> qryCurs) {
this.req = req;
this.qryCurs = qryCurs;
}
/** {@inheritDoc} */
@Override public GridRestResponse call() throws Exception {
try {
QueryCursorIterator qryCurIt = qryCurs.get(req.queryId());
if (qryCurIt == null)
return new GridRestResponse(GridRestResponse.STATUS_FAILED,
"Failed to find query with ID: " + req.queryId() + ". " +
"Possible reasons: wrong query ID, no more data to fetch from query, query was closed by timeout" +
" or node where query was executed is not found.");
qryCurIt.lock();
try {
if (qryCurIt.timestamp() == -1)
return new GridRestResponse(GridRestResponse.STATUS_FAILED,
"Query with ID: " + req.queryId() + " was closed by timeout");
qryCurIt.timestamp(U.currentTimeMillis());
Iterator cur = qryCurIt.iterator();
CacheQueryResult res = createQueryResult(cur, req, req.queryId(), qryCurs);
return new GridRestResponse(res);
}
finally {
qryCurIt.unlock();
}
}
catch (Exception e) {
removeQueryCursor(req.queryId(), qryCurs);
return new GridRestResponse(GridRestResponse.STATUS_FAILED, e.getMessage());
}
}
}
/**
* Query cursor iterator.
*/
private static class QueryCursorIterator extends ReentrantLock {
/** */
private static final long serialVersionUID = 0L;
/** Query cursor. */
private QueryCursor cur;
/** Query iterator. */
private Iterator it;
/** Last timestamp. */
private volatile long ts;
/**
* @param cur Query cursor.
* @param it Query iterator.
*/
public QueryCursorIterator(QueryCursor cur, Iterator it) {
this.cur = cur;
this.it = it;
ts = U.currentTimeMillis();
}
/**
* @return Query iterator.
*/
public Iterator iterator() {
return it;
}
/**
* @return Timestamp.
*/
public long timestamp() {
return ts;
}
/**
* @param time Current time or -1 if cursor is closed.
*/
public void timestamp(long time) {
ts = time;
}
/**
* Close query cursor.
*/
public void close() {
cur.close();
}
}
}