blob: daa5536251924560d719418b504f872c7845ad2a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.query.h2.twostep;
import java.util.concurrent.atomic.AtomicReferenceArray;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import org.apache.ignite.internal.processors.query.h2.opt.QueryContext;
import org.jetbrains.annotations.Nullable;
/**
* Mapper query results.
*/
class MapQueryResults {
/** H2 indexing. */
private final IgniteH2Indexing h2;
/** */
private final long qryReqId;
/** */
private final AtomicReferenceArray<MapQueryResult> results;
/** */
private final GridQueryCancel[] cancels;
/** */
private final GridCacheContext<?, ?> cctx;
/** Lazy mode. */
private final boolean lazy;
/** */
private volatile boolean cancelled;
/** Query context. */
private final QueryContext qctx;
/** Active queries. */
private int active;
/**
* Constructor.
*
* @param h2 Indexing instance.
* @param qryReqId Query request ID.
* @param qrys Number of queries.
* @param cctx Cache context.
* @param lazy Lazy flag.
* @param qctx Query context.
*/
MapQueryResults(IgniteH2Indexing h2, long qryReqId, int qrys, @Nullable GridCacheContext<?, ?> cctx,
boolean lazy, QueryContext qctx) {
this.h2 = h2;
this.qryReqId = qryReqId;
this.cctx = cctx;
this.lazy = lazy;
this.qctx = qctx;
active = qrys;
results = new AtomicReferenceArray<>(qrys);
cancels = new GridQueryCancel[qrys];
for (int i = 0; i < cancels.length; i++)
cancels[i] = new GridQueryCancel();
}
/**
* @param qry Query result index.
* @return Query result.
*/
MapQueryResult result(int qry) {
return results.get(qry);
}
/**
* Get cancel token for query.
*
* @param qryIdx Query index.
* @return Cancel token.
*/
GridQueryCancel queryCancel(int qryIdx) {
return cancels[qryIdx];
}
/**
* Add result.
* @param qryIdx Query result index.
* @param res Result.
*/
void addResult(int qryIdx, MapQueryResult res) {
if (!results.compareAndSet(qryIdx, null, res))
throw new IllegalStateException();
}
/**
* @return {@code true} If all results are closed.
*/
synchronized boolean isAllClosed() {
return active == 0;
}
/**
* Cancels the query.
*/
void cancel() {
synchronized (this) {
if (cancelled)
return;
cancelled = true;
for (int i = 0; i < results.length(); i++) {
GridQueryCancel cancel = cancels[i];
if (cancel != null)
cancel.cancel();
}
}
// The closing result set is synchronized by themselves.
// Include to synchronize block may be cause deadlock on <this> and MapQueryResult#lock.
close();
}
/**
* Wrap MapQueryResult#close to synchronize close vs cancel.
* We have do it because connection returns to pool after close ResultSet but the whole MapQuery
* (that may contains several queries) may be canceled later.
*
* @param idx Map query (result) index.
*/
void closeResult(int idx) {
MapQueryResult res = results.get(idx);
if (res != null) {
boolean lastClosed = false;
try {
// Session isn't set for lazy=false queries.
// Also session == null when result already closed.
res.lock();
res.lockTables();
synchronized (this) {
if (!res.closed()) {
res.close();
// The statement of the closed result must not be canceled
// because statement & connection may be reused.
cancels[idx] = null;
active--;
lastClosed = active == 0;
}
}
}
finally {
res.unlock();
}
if (lastClosed)
onAllClosed();
}
}
/**
* Close map results.
*/
public void close() {
for (int i = 0; i < results.length(); i++)
closeResult(i);
}
/**
* All max results closed callback.
*/
private void onAllClosed() {
assert active == 0;
if (lazy)
releaseQueryContext();
}
/**
* @return Cancel flag.
*/
boolean cancelled() {
return cancelled;
}
/**
* @return Query request ID.
*/
long queryRequestId() {
return qryReqId;
}
/**
* Release query context.
*/
public void releaseQueryContext() {
if (qctx.distributedJoinContext() == null)
qctx.clearContext(false);
}
/**
* @return Lazy flag.
*/
public boolean isLazy() {
return lazy;
}
}