blob: 1f2fe43e8a632ca1bdbfafa4e056e774c6408f72 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.query;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import javax.cache.configuration.Factory;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.thread.IgniteThread;
/**
* Container for connection properties passed by various drivers (JDBC, ODBC drivers) having notion of an
* <b>SQL connection</b> - Ignite basically does not have one.<p>
* Also contains anything that a driver may need to share between threads processing queries of logically same client -
* see JDBC thin driver
*/
public class SqlClientContext implements AutoCloseable {
/** Kernal context. */
private final GridKernalContext ctx;
/** Distributed joins flag. */
private final boolean distributedJoins;
/** Enforce join order flag. */
private final boolean enforceJoinOrder;
/** Collocated flag. */
private final boolean collocated;
/** Replicated caches only flag. */
private final boolean replicatedOnly;
/** Lazy query execution flag. */
private final boolean lazy;
/** Skip reducer on update flag. */
private final boolean skipReducerOnUpdate;
/** Monitor. */
private final Object mux = new Object();
/** Allow overwrites for duplicate keys on streamed {@code INSERT}s. */
private boolean streamAllowOverwrite;
/** Parallel ops count per node for data streamer. */
private int streamNodeParOps;
/** Node buffer size for data streamer. */
private int streamNodeBufSize;
/** Auto flush frequency for streaming. */
private long streamFlushTimeout;
/** Stream ordered. */
private boolean streamOrdered;
/** Streamers for various caches. */
private volatile Map<String, IgniteDataStreamer<?, ?>> streamers;
/** Ordered batch thread. */
private IgniteThread orderedBatchThread;
/** Ordered batch worker factory. */
private Factory<GridWorker> orderedBatchWorkerFactory;
/** Count of the processed ordered batch requests. Used to wait end of processing all request before starts
* the processing the last request. */
private long totalProcessedOrderedReqs;
/** Logger. */
private final IgniteLogger log;
/**
* @param ctx Kernal context.
* @param orderedBatchWorkerFactory Ordered batch worker factory.
* @param distributedJoins Distributed joins flag.
* @param enforceJoinOrder Enforce join order flag.
* @param collocated Collocated flag.
* @param replicatedOnly Replicated caches only flag.
* @param lazy Lazy query execution flag.
* @param skipReducerOnUpdate Skip reducer on update flag.
*/
public SqlClientContext(GridKernalContext ctx, Factory<GridWorker> orderedBatchWorkerFactory,
boolean distributedJoins, boolean enforceJoinOrder,
boolean collocated, boolean replicatedOnly, boolean lazy, boolean skipReducerOnUpdate) {
this.ctx = ctx;
this.orderedBatchWorkerFactory = orderedBatchWorkerFactory;
this.distributedJoins = distributedJoins;
this.enforceJoinOrder = enforceJoinOrder;
this.collocated = collocated;
this.replicatedOnly = replicatedOnly;
this.lazy = lazy;
this.skipReducerOnUpdate = skipReducerOnUpdate;
log = ctx.log(SqlClientContext.class.getName());
}
/**
* Turn on streaming on this client context.
*
* @param allowOverwrite Whether streaming should overwrite existing values.
* @param flushFreq Flush frequency for streamers.
* @param perNodeBufSize Per node streaming buffer size.
* @param perNodeParOps Per node streaming parallel operations number.
* @param ordered Ordered stream flag.
*/
public void enableStreaming(boolean allowOverwrite, long flushFreq, int perNodeBufSize,
int perNodeParOps, boolean ordered) {
synchronized (mux) {
if (isStream())
return;
streamers = new HashMap<>();
this.streamAllowOverwrite = allowOverwrite;
this.streamFlushTimeout = flushFreq;
this.streamNodeBufSize = perNodeBufSize;
this.streamNodeParOps = perNodeParOps;
this.streamOrdered = ordered;
if (ordered) {
orderedBatchThread = new IgniteThread(orderedBatchWorkerFactory.create());
orderedBatchThread.start();
}
}
}
/**
* Turn off streaming on this client context - with closing all open streamers, if any.
*/
public void disableStreaming() {
synchronized (mux) {
if (!isStream())
return;
Iterator<IgniteDataStreamer<?, ?>> it = streamers.values().iterator();
while (it.hasNext()) {
IgniteDataStreamer<?, ?> streamer = it.next();
U.close(streamer, log);
it.remove();
}
streamers = null;
orderedBatchThread = null;
}
}
/**
* @return Collocated flag.
*/
public boolean isCollocated() {
return collocated;
}
/**
* @return Distributed joins flag.
*/
public boolean isDistributedJoins() {
return distributedJoins;
}
/**
* @return Enforce join order flag.
*/
public boolean isEnforceJoinOrder() {
return enforceJoinOrder;
}
/**
* @return Replicated caches only flag.
*/
public boolean isReplicatedOnly() {
return replicatedOnly;
}
/**
* @return Lazy query execution flag.
*/
public boolean isLazy() {
return lazy;
}
/**
* @return Skip reducer on update flag,
*/
public boolean isSkipReducerOnUpdate() {
return skipReducerOnUpdate;
}
/**
* @return Streaming state flag (on or off).
*/
public boolean isStream() {
synchronized (mux) {
return streamers != null;
}
}
/**
* @return Stream ordered flag.
*/
public boolean isStreamOrdered() {
synchronized (mux) {
return streamOrdered;
}
}
/**
* @param cacheName Cache name.
* @return Streamer for given cache.
*/
public IgniteDataStreamer<?, ?> streamerForCache(String cacheName) {
synchronized (mux) {
if (streamers == null)
return null;
IgniteDataStreamer<?, ?> res = streamers.get(cacheName);
if (res != null)
return res;
res = ctx.grid().dataStreamer(cacheName);
res.autoFlushFrequency(streamFlushTimeout);
res.allowOverwrite(streamAllowOverwrite);
if (streamNodeBufSize > 0)
res.perNodeBufferSize(streamNodeBufSize);
if (streamNodeParOps > 0)
res.perNodeParallelOperations(streamNodeParOps);
streamers.put(cacheName, res);
return res;
}
}
/**
* Waits when total processed ordered requests count to be equal to specified value.
* @param total Expected total processed request.
*/
public void waitTotalProcessedOrderedRequests(long total) {
synchronized (mux) {
while (totalProcessedOrderedReqs < total) {
try {
mux.wait();
}
catch (InterruptedException e) {
throw new IgniteException("Waiting for end of processing the last batch is interrupted", e);
}
}
}
}
/**
*
*/
public void orderedRequestProcessed() {
synchronized (mux) {
totalProcessedOrderedReqs++;
mux.notify();
}
}
/** {@inheritDoc} */
@Override public void close() throws Exception {
if (streamers == null)
return;
for (IgniteDataStreamer<?, ?> s : streamers.values())
U.close(s, log);
}
}