blob: 4fc1054cd69e04f1d1988f1947d755b7f8843707 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.cache.CacheException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.QueryCancelledException;
import org.apache.ignite.internal.processors.cache.query.QueryCursorEx;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
import static org.apache.ignite.internal.processors.cache.QueryCursorImpl.State.CLOSED;
import static org.apache.ignite.internal.processors.cache.QueryCursorImpl.State.EXECUTION;
import static org.apache.ignite.internal.processors.cache.QueryCursorImpl.State.IDLE;
import static org.apache.ignite.internal.processors.cache.QueryCursorImpl.State.RESULT_READY;
/**
* Query cursor implementation.
*/
public class QueryCursorImpl<T> implements QueryCursorEx<T>, FieldsQueryCursor<T> {
/** */
private final static AtomicReferenceFieldUpdater<QueryCursorImpl, State> STATE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(QueryCursorImpl.class, State.class, "state");
/** Query executor. */
private final Iterable<T> iterExec;
/** Result type flag - result set or update counter. */
private final boolean isQry;
/** */
private Iterator<T> iter;
/** */
private volatile State state = IDLE;
/** */
private List<GridQueryFieldMetadata> fieldsMeta;
/** */
private final GridQueryCancel cancel;
/**
* @param iterExec Query executor.
* @param cancel Cancellation closure.
*/
public QueryCursorImpl(Iterable<T> iterExec, GridQueryCancel cancel) {
this(iterExec, cancel, true);
}
/**
* @param iterExec Query executor.
*/
public QueryCursorImpl(Iterable<T> iterExec) {
this(iterExec, null);
}
/**
* @param iterExec Query executor.
* @param isQry Result type flag - {@code true} for query, {@code false} for update operation.
*/
public QueryCursorImpl(Iterable<T> iterExec, GridQueryCancel cancel, boolean isQry) {
this.iterExec = iterExec;
this.cancel = cancel;
this.isQry = isQry;
}
/** {@inheritDoc} */
@Override public Iterator<T> iterator() {
if (!STATE_UPDATER.compareAndSet(this, IDLE, EXECUTION))
throw new IgniteException("Iterator is already fetched or query was cancelled.");
iter = iterExec.iterator();
if (!STATE_UPDATER.compareAndSet(this, EXECUTION, RESULT_READY)) {
// Handle race with cancel and make sure the iterator resources are freed correctly.
closeIter();
throw new CacheException(new QueryCancelledException());
}
assert iter != null;
return iter;
}
/** {@inheritDoc} */
@Override public List<T> getAll() {
List<T> all = new ArrayList<>();
try {
for (T t : this) // Implicitly calls iterator() to do all checks.
all.add(t);
}
finally {
close();
}
return all;
}
/** {@inheritDoc} */
@Override public void getAll(QueryCursorEx.Consumer<T> clo) throws IgniteCheckedException {
try {
for (T t : this)
clo.consume(t);
}
finally {
close();
}
}
/** {@inheritDoc} */
@Override public void close() {
while(state != CLOSED) {
if (STATE_UPDATER.compareAndSet(this, RESULT_READY, CLOSED)) {
closeIter();
return;
}
if (STATE_UPDATER.compareAndSet(this, EXECUTION, CLOSED)) {
if (cancel != null)
cancel.cancel();
return;
}
if (STATE_UPDATER.compareAndSet(this, IDLE, CLOSED))
return;
}
}
/**
* Closes iterator.
*/
private void closeIter() {
if (iter instanceof AutoCloseable) {
try {
((AutoCloseable)iter).close();
}
catch (Exception e) {
throw new IgniteException(e);
}
}
}
/**
* @return {@code true} if this cursor corresponds to a {@link ResultSet} as a result of query,
* {@code false} if query was modifying operation like INSERT, UPDATE, or DELETE.
*/
public boolean isQuery() {
return isQry;
}
/**
* @param fieldsMeta SQL Fields query result metadata.
*/
public void fieldsMeta(List<GridQueryFieldMetadata> fieldsMeta) {
this.fieldsMeta = fieldsMeta;
}
/**
* @return SQL Fields query result metadata.
*/
@Override public List<GridQueryFieldMetadata> fieldsMeta() {
return fieldsMeta;
}
/** {@inheritDoc} */
@Override public String getFieldName(int idx) {
assert this.fieldsMeta != null;
GridQueryFieldMetadata metadata = fieldsMeta.get(idx);
return metadata.fieldName();
}
/** {@inheritDoc} */
@Override public int getColumnsCount() {
assert this.fieldsMeta != null;
return fieldsMeta.size();
}
/** Query cursor state */
protected enum State {
/** Idle. */IDLE,
/** Executing. */EXECUTION,
/** Result ready. */RESULT_READY,
/** Closed. */CLOSED,
}
}