| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.query.h2.twostep; |
| |
| import java.lang.reflect.Constructor; |
| import java.sql.Connection; |
| import java.sql.ResultSet; |
| import java.sql.SQLException; |
| import java.sql.Statement; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.LinkedHashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.concurrent.locks.Lock; |
| import java.util.concurrent.locks.ReentrantLock; |
| import javax.cache.CacheException; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteClientDisconnectedException; |
| import org.apache.ignite.IgniteException; |
| import org.apache.ignite.IgniteLogger; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.events.DiscoveryEvent; |
| import org.apache.ignite.events.Event; |
| import org.apache.ignite.events.EventType; |
| import org.apache.ignite.internal.GridKernalContext; |
| import org.apache.ignite.internal.GridTopic; |
| import org.apache.ignite.internal.IgniteInterruptedCheckedException; |
| import org.apache.ignite.internal.managers.communication.GridIoPolicy; |
| import org.apache.ignite.internal.managers.communication.GridMessageListener; |
| import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; |
| import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; |
| import org.apache.ignite.internal.processors.cache.GridCacheContext; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2; |
| import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery; |
| import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery; |
| import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator; |
| import org.apache.ignite.internal.processors.query.h2.GridH2ResultSetIterator; |
| import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; |
| import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuerySplitter; |
| import org.apache.ignite.internal.processors.query.h2.sql.GridSqlType; |
| import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest; |
| import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse; |
| import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest; |
| import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageResponse; |
| import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryRequest; |
| import org.apache.ignite.internal.util.GridSpinBusyLock; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgniteFuture; |
| import org.apache.ignite.marshaller.Marshaller; |
| import org.apache.ignite.plugin.extensions.communication.Message; |
| import org.h2.command.ddl.CreateTableData; |
| import org.h2.engine.Session; |
| import org.h2.index.Cursor; |
| import org.h2.jdbc.JdbcConnection; |
| import org.h2.jdbc.JdbcResultSet; |
| import org.h2.jdbc.JdbcStatement; |
| import org.h2.result.ResultInterface; |
| import org.h2.result.Row; |
| import org.h2.table.Column; |
| import org.h2.util.IntArray; |
| import org.h2.value.Value; |
| import org.jetbrains.annotations.Nullable; |
| import org.jsr166.ConcurrentHashMap8; |
| |
| import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE; |
| |
| /** |
| * Reduce query executor. |
| */ |
| public class GridReduceQueryExecutor { |
| /** Thread pool to process query messages. */ |
| public static final byte QUERY_POOL = GridIoPolicy.SYSTEM_POOL; |
| |
| /** */ |
| private GridKernalContext ctx; |
| |
| /** */ |
| private IgniteH2Indexing h2; |
| |
| /** */ |
| private IgniteLogger log; |
| |
| /** */ |
| private final AtomicLong reqIdGen = new AtomicLong(); |
| |
| /** */ |
| private final ConcurrentMap<Long, QueryRun> runs = new ConcurrentHashMap8<>(); |
| |
| /** */ |
| private volatile List<GridThreadLocalTable> fakeTbls = Collections.emptyList(); |
| |
| /** */ |
| private final Lock fakeTblsLock = new ReentrantLock(); |
| |
| /** */ |
| private static final Constructor<JdbcResultSet> CONSTRUCTOR; |
| |
| /** |
| * Init constructor. |
| */ |
| static { |
| try { |
| CONSTRUCTOR = JdbcResultSet.class.getDeclaredConstructor( |
| JdbcConnection.class, |
| JdbcStatement.class, |
| ResultInterface.class, |
| Integer.TYPE, |
| Boolean.TYPE, |
| Boolean.TYPE, |
| Boolean.TYPE |
| ); |
| |
| CONSTRUCTOR.setAccessible(true); |
| } |
| catch (NoSuchMethodException e) { |
| throw new IllegalStateException("Check H2 version in classpath.", e); |
| } |
| } |
| |
| /** */ |
| private final GridSpinBusyLock busyLock; |
| |
| /** |
| * @param busyLock Busy lock. |
| */ |
| public GridReduceQueryExecutor(GridSpinBusyLock busyLock) { |
| this.busyLock = busyLock; |
| } |
| |
| /** |
| * @param ctx Context. |
| * @param h2 H2 Indexing. |
| * @throws IgniteCheckedException If failed. |
| */ |
| public void start(final GridKernalContext ctx, final IgniteH2Indexing h2) throws IgniteCheckedException { |
| this.ctx = ctx; |
| this.h2 = h2; |
| |
| log = ctx.log(GridReduceQueryExecutor.class); |
| |
| ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, new GridMessageListener() { |
| @Override public void onMessage(UUID nodeId, Object msg) { |
| if (!busyLock.enterBusy()) |
| return; |
| |
| try { |
| GridReduceQueryExecutor.this.onMessage(nodeId, msg); |
| } |
| finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| }); |
| |
| ctx.event().addLocalEventListener(new GridLocalEventListener() { |
| @Override public void onEvent(final Event evt) { |
| UUID nodeId = ((DiscoveryEvent)evt).eventNode().id(); |
| |
| for (QueryRun r : runs.values()) { |
| for (GridMergeIndex idx : r.idxs) { |
| if (idx.hasSource(nodeId)) { |
| handleNodeLeft(r, nodeId); |
| |
| break; |
| } |
| } |
| } |
| } |
| }, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT); |
| } |
| |
| /** |
| * @param r Query run. |
| * @param nodeId Left node ID. |
| */ |
| private void handleNodeLeft(QueryRun r, UUID nodeId) { |
| // Will attempt to retry. If reduce query was started it will fail on next page fetching. |
| retry(r, h2.readyTopologyVersion(), nodeId); |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param msg Message. |
| */ |
| public void onMessage(UUID nodeId, Object msg) { |
| try { |
| assert msg != null; |
| |
| ClusterNode node = ctx.discovery().node(nodeId); |
| |
| if (node == null) |
| return; // Node left, ignore. |
| |
| boolean processed = true; |
| |
| if (msg instanceof GridQueryNextPageResponse) |
| onNextPage(node, (GridQueryNextPageResponse)msg); |
| else if (msg instanceof GridQueryFailResponse) |
| onFail(node, (GridQueryFailResponse)msg); |
| else |
| processed = false; |
| |
| if (processed && log.isDebugEnabled()) |
| log.debug("Processed response: " + nodeId + "->" + ctx.localNodeId() + " " + msg); |
| } |
| catch(Throwable th) { |
| U.error(log, "Failed to process message: " + msg, th); |
| } |
| } |
| |
| /** |
| * @param node Node. |
| * @param msg Message. |
| */ |
| private void onFail(ClusterNode node, GridQueryFailResponse msg) { |
| QueryRun r = runs.get(msg.queryRequestId()); |
| |
| fail(r, node.id(), msg.error()); |
| } |
| |
| /** |
| * @param r Query run. |
| * @param nodeId Failed node ID. |
| * @param msg Error message. |
| */ |
| private void fail(QueryRun r, UUID nodeId, String msg) { |
| if (r != null) |
| r.state(new CacheException("Failed to execute map query on the node: " + nodeId + ", " + msg), nodeId); |
| } |
| |
| /** |
| * @param node Node. |
| * @param msg Message. |
| */ |
| private void onNextPage(final ClusterNode node, GridQueryNextPageResponse msg) { |
| final long qryReqId = msg.queryRequestId(); |
| final int qry = msg.query(); |
| |
| final QueryRun r = runs.get(qryReqId); |
| |
| if (r == null) // Already finished with error or canceled. |
| return; |
| |
| final int pageSize = r.pageSize; |
| |
| GridMergeIndex idx = r.idxs.get(msg.query()); |
| |
| GridResultPage page; |
| |
| try { |
| page = new GridResultPage(ctx, node.id(), msg) { |
| @Override public void fetchNextPage() { |
| Object errState = r.state.get(); |
| |
| if (errState != null) { |
| CacheException err0 = errState instanceof CacheException ? (CacheException)errState : null; |
| |
| if (err0 != null && err0.getCause() instanceof IgniteClientDisconnectedException) |
| throw err0; |
| |
| CacheException e = new CacheException("Failed to fetch data from node: " + node.id()); |
| |
| if (err0 != null) |
| e.addSuppressed(err0); |
| |
| throw e; |
| } |
| |
| try { |
| GridQueryNextPageRequest msg0 = new GridQueryNextPageRequest(qryReqId, qry, pageSize); |
| |
| if (node.isLocal()) |
| h2.mapQueryExecutor().onMessage(ctx.localNodeId(), msg0); |
| else |
| ctx.io().send(node, GridTopic.TOPIC_QUERY, msg0, QUERY_POOL); |
| } |
| catch (IgniteCheckedException e) { |
| throw new CacheException("Failed to fetch data from node: " + node.id(), e); |
| } |
| } |
| }; |
| } |
| catch (Exception e) { |
| U.error(log, "Error in message.", e); |
| |
| fail(r, node.id(), "Error in message."); |
| |
| return; |
| } |
| |
| idx.addPage(page); |
| |
| if (msg.retry() != null) |
| retry(r, msg.retry(), node.id()); |
| else if (msg.allRows() != -1) // Only the first page contains row count. |
| r.latch.countDown(); |
| } |
| |
| /** |
| * @param r Query run. |
| * @param retryVer Retry version. |
| * @param nodeId Node ID. |
| */ |
| private void retry(QueryRun r, AffinityTopologyVersion retryVer, UUID nodeId) { |
| r.state(retryVer, nodeId); |
| } |
| |
| /** |
| * @param cctx Cache context for main space. |
| * @param extraSpaces Extra spaces. |
| * @return {@code true} If preloading is active. |
| */ |
| private boolean isPreloadingActive(final GridCacheContext<?,?> cctx, List<String> extraSpaces) { |
| if (hasMovingPartitions(cctx)) |
| return true; |
| |
| if (extraSpaces != null) { |
| for (String extraSpace : extraSpaces) { |
| if (hasMovingPartitions(cacheContext(extraSpace))) |
| return true; |
| } |
| } |
| |
| return false; |
| } |
| |
| /** |
| * @param cctx Cache context. |
| * @return {@code true} If cache context |
| */ |
| private boolean hasMovingPartitions(GridCacheContext<?,?> cctx) { |
| GridDhtPartitionFullMap fullMap = cctx.topology().partitionMap(false); |
| |
| for (GridDhtPartitionMap2 map : fullMap.values()) { |
| if (map.hasMovingPartitions()) |
| return true; |
| } |
| |
| return false; |
| } |
| |
| /** |
| * @param name Cache name. |
| * @return Cache context. |
| */ |
| private GridCacheContext<?,?> cacheContext(String name) { |
| return ctx.cache().internalCache(name).context(); |
| } |
| |
| /** |
| * @param topVer Topology version. |
| * @param cctx Cache context for main space. |
| * @param extraSpaces Extra spaces. |
| * @return Data nodes or {@code null} if repartitioning started and we need to retry.. |
| */ |
| private Collection<ClusterNode> stableDataNodes( |
| AffinityTopologyVersion topVer, |
| final GridCacheContext<?,?> cctx, |
| List<String> extraSpaces |
| ) { |
| String space = cctx.name(); |
| |
| Set<ClusterNode> nodes = new HashSet<>(dataNodes(space, topVer)); |
| |
| if (F.isEmpty(nodes)) |
| throw new CacheException("Failed to find data nodes for cache: " + space); |
| |
| if (!F.isEmpty(extraSpaces)) { |
| for (String extraSpace : extraSpaces) { |
| GridCacheContext<?,?> extraCctx = cacheContext(extraSpace); |
| |
| if (extraCctx.isLocal()) |
| continue; // No consistency guaranties for local caches. |
| |
| if (cctx.isReplicated() && !extraCctx.isReplicated()) |
| throw new CacheException("Queries running on replicated cache should not contain JOINs " + |
| "with partitioned tables [rCache=" + cctx.name() + ", pCache=" + extraSpace + "]"); |
| |
| Collection<ClusterNode> extraNodes = dataNodes(extraSpace, topVer); |
| |
| if (F.isEmpty(extraNodes)) |
| throw new CacheException("Failed to find data nodes for cache: " + extraSpace); |
| |
| if (cctx.isReplicated() && extraCctx.isReplicated()) { |
| nodes.retainAll(extraNodes); |
| |
| if (nodes.isEmpty()) { |
| if (isPreloadingActive(cctx, extraSpaces)) |
| return null; // Retry. |
| else |
| throw new CacheException("Caches have distinct sets of data nodes [cache1=" + cctx.name() + |
| ", cache2=" + extraSpace + "]"); |
| } |
| } |
| else if (!cctx.isReplicated() && extraCctx.isReplicated()) { |
| if (!extraNodes.containsAll(nodes)) |
| if (isPreloadingActive(cctx, extraSpaces)) |
| return null; // Retry. |
| else |
| throw new CacheException("Caches have distinct sets of data nodes [cache1=" + cctx.name() + |
| ", cache2=" + extraSpace + "]"); |
| } |
| else if (!cctx.isReplicated() && !extraCctx.isReplicated()) { |
| if (extraNodes.size() != nodes.size() || !nodes.containsAll(extraNodes)) |
| if (isPreloadingActive(cctx, extraSpaces)) |
| return null; // Retry. |
| else |
| throw new CacheException("Caches have distinct sets of data nodes [cache1=" + cctx.name() + |
| ", cache2=" + extraSpace + "]"); |
| } |
| else |
| throw new IllegalStateException(); |
| } |
| } |
| |
| return nodes; |
| } |
| |
| /** |
| * @param cctx Cache context. |
| * @param qry Query. |
| * @param keepBinary Keep binary. |
| * @return Cursor. |
| */ |
| public Iterator<List<?>> query(GridCacheContext<?,?> cctx, GridCacheTwoStepQuery qry, boolean keepBinary) { |
| for (int attempt = 0;; attempt++) { |
| if (attempt != 0) { |
| try { |
| Thread.sleep(attempt * 10); // Wait for exchange. |
| } |
| catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| |
| throw new CacheException("Query was interrupted.", e); |
| } |
| } |
| |
| long qryReqId = reqIdGen.incrementAndGet(); |
| |
| QueryRun r = new QueryRun(); |
| |
| r.pageSize = qry.pageSize() <= 0 ? GridCacheTwoStepQuery.DFLT_PAGE_SIZE : qry.pageSize(); |
| |
| r.idxs = new ArrayList<>(qry.mapQueries().size()); |
| |
| String space = cctx.name(); |
| |
| r.conn = (JdbcConnection)h2.connectionForSpace(space); |
| |
| AffinityTopologyVersion topVer = h2.readyTopologyVersion(); |
| |
| List<String> extraSpaces = extraSpaces(space, qry.spaces()); |
| |
| Collection<ClusterNode> nodes; |
| |
| // Explicit partition mapping for unstable topology. |
| Map<ClusterNode, IntArray> partsMap = null; |
| |
| if (isPreloadingActive(cctx, extraSpaces)) { |
| if (cctx.isReplicated()) |
| nodes = replicatedUnstableDataNodes(cctx, extraSpaces); |
| else { |
| partsMap = partitionedUnstableDataNodes(cctx, extraSpaces); |
| |
| nodes = partsMap == null ? null : partsMap.keySet(); |
| } |
| } |
| else |
| nodes = stableDataNodes(topVer, cctx, extraSpaces); |
| |
| if (nodes == null) |
| continue; // Retry. |
| |
| assert !nodes.isEmpty(); |
| |
| if (cctx.isReplicated() || qry.explain()) { |
| assert qry.explain() || !nodes.contains(ctx.discovery().localNode()) : |
| "We must be on a client node."; |
| |
| // Select random data node to run query on a replicated data or get EXPLAIN PLAN from a single node. |
| nodes = Collections.singleton(F.rand(nodes)); |
| } |
| |
| int tblIdx = 0; |
| |
| final boolean skipMergeTbl = !qry.explain() && qry.skipMergeTable(); |
| |
| for (GridCacheSqlQuery mapQry : qry.mapQueries()) { |
| GridMergeIndex idx; |
| |
| if (!skipMergeTbl) { |
| GridMergeTable tbl; |
| |
| try { |
| tbl = createMergeTable(r.conn, mapQry, qry.explain()); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| |
| idx = tbl.getScanIndex(null); |
| |
| fakeTable(r.conn, tblIdx++).setInnerTable(tbl); |
| } |
| else |
| idx = GridMergeIndexUnsorted.createDummy(); |
| |
| for (ClusterNode node : nodes) |
| idx.addSource(node.id()); |
| |
| r.idxs.add(idx); |
| } |
| |
| r.latch = new CountDownLatch(r.idxs.size() * nodes.size()); |
| |
| runs.put(qryReqId, r); |
| |
| try { |
| if (ctx.clientDisconnected()) { |
| throw new CacheException("Query was cancelled, client node disconnected.", |
| new IgniteClientDisconnectedException(ctx.cluster().clientReconnectFuture(), |
| "Client node disconnected.")); |
| } |
| |
| Collection<GridCacheSqlQuery> mapQrys = qry.mapQueries(); |
| |
| if (qry.explain()) { |
| mapQrys = new ArrayList<>(qry.mapQueries().size()); |
| |
| for (GridCacheSqlQuery mapQry : qry.mapQueries()) |
| mapQrys.add(new GridCacheSqlQuery("EXPLAIN " + mapQry.query(), mapQry.parameters())); |
| } |
| |
| if (nodes.size() != 1 || !F.first(nodes).isLocal()) { // Marshall params for remotes. |
| Marshaller m = ctx.config().getMarshaller(); |
| |
| for (GridCacheSqlQuery mapQry : mapQrys) |
| mapQry.marshallParams(m); |
| } |
| |
| boolean retry = false; |
| |
| if (send(nodes, |
| new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys, topVer, extraSpaces, null), partsMap)) { |
| awaitAllReplies(r, nodes); |
| |
| Object state = r.state.get(); |
| |
| if (state != null) { |
| if (state instanceof CacheException) { |
| CacheException err = (CacheException)state; |
| |
| if (err.getCause() instanceof IgniteClientDisconnectedException) |
| throw err; |
| |
| throw new CacheException("Failed to run map query remotely.", err); |
| } |
| |
| if (state instanceof AffinityTopologyVersion) { |
| retry = true; |
| |
| // If remote node asks us to retry then we have outdated full partition map. |
| h2.awaitForReadyTopologyVersion((AffinityTopologyVersion)state); |
| } |
| } |
| } |
| else // Send failed. |
| retry = true; |
| |
| Iterator<List<?>> resIter = null; |
| |
| if (!retry) { |
| if (qry.explain()) |
| return explainPlan(r.conn, space, qry); |
| |
| if (skipMergeTbl) { |
| List<List<?>> res = new ArrayList<>(); |
| |
| assert r.idxs.size() == 1 : r.idxs; |
| |
| GridMergeIndex idx = r.idxs.get(0); |
| |
| Cursor cur = idx.findInStream(null, null); |
| |
| while (cur.next()) { |
| Row row = cur.get(); |
| |
| int cols = row.getColumnCount(); |
| |
| List<Object> resRow = new ArrayList<>(cols); |
| |
| for (int c = 0; c < cols; c++) |
| resRow.add(row.getValue(c).getObject()); |
| |
| res.add(resRow); |
| } |
| |
| resIter = res.iterator(); |
| } |
| else { |
| GridCacheSqlQuery rdc = qry.reduceQuery(); |
| |
| // Statement caching is prohibited here because we can't guarantee correct merge index reuse. |
| ResultSet res = h2.executeSqlQueryWithTimer(space, |
| r.conn, |
| rdc.query(), |
| F.asList(rdc.parameters()), |
| false); |
| |
| resIter = new Iter(res); |
| } |
| } |
| |
| for (GridMergeIndex idx : r.idxs) { |
| if (!idx.fetchedAll()) // We have to explicitly cancel queries on remote nodes. |
| send(nodes, new GridQueryCancelRequest(qryReqId), null); |
| } |
| |
| if (retry) { |
| if (Thread.currentThread().isInterrupted()) |
| throw new IgniteInterruptedCheckedException("Query was interrupted."); |
| |
| continue; |
| } |
| |
| return new GridQueryCacheObjectsIterator(resIter, cctx, keepBinary); |
| } |
| catch (IgniteCheckedException | RuntimeException e) { |
| U.closeQuiet(r.conn); |
| |
| if (e instanceof CacheException) |
| throw (CacheException)e; |
| |
| Throwable cause = e; |
| |
| if (e instanceof IgniteCheckedException) { |
| Throwable disconnectedErr = |
| ((IgniteCheckedException)e).getCause(IgniteClientDisconnectedException.class); |
| |
| if (disconnectedErr != null) |
| cause = disconnectedErr; |
| } |
| |
| throw new CacheException("Failed to run reduce query locally.", cause); |
| } |
| finally { |
| if (!runs.remove(qryReqId, r)) |
| U.warn(log, "Query run was already removed: " + qryReqId); |
| |
| if (!skipMergeTbl) { |
| for (int i = 0, mapQrys = qry.mapQueries().size(); i < mapQrys; i++) |
| fakeTable(null, i).setInnerTable(null); // Drop all merge tables. |
| } |
| } |
| } |
| } |
| |
| /** |
| * @param r Query run. |
| * @param nodes Nodes to check periodically if they alive. |
| * @throws IgniteInterruptedCheckedException If interrupted. |
| */ |
| private void awaitAllReplies(QueryRun r, Collection<ClusterNode> nodes) |
| throws IgniteInterruptedCheckedException { |
| while (!U.await(r.latch, 500, TimeUnit.MILLISECONDS)) { |
| for (ClusterNode node : nodes) { |
| if (!ctx.discovery().alive(node)) { |
| handleNodeLeft(r, node.id()); |
| |
| assert r.latch.getCount() == 0; |
| |
| return; |
| } |
| } |
| } |
| } |
| |
| /** |
| * @param idx Table index. |
| * @return Table name. |
| */ |
| private static String table(int idx) { |
| return GridSqlQuerySplitter.table(idx).getSQL(); |
| } |
| |
| /** |
| * Gets or creates new fake table for index. |
| * |
| * @param idx Index of table. |
| * @return Table. |
| */ |
| private GridThreadLocalTable fakeTable(Connection c, int idx) { |
| List<GridThreadLocalTable> tbls = fakeTbls; |
| |
| assert tbls.size() >= idx; |
| |
| if (tbls.size() == idx) { // If table for such index does not exist, create one. |
| fakeTblsLock.lock(); |
| |
| try { |
| if ((tbls = fakeTbls).size() == idx) { // Double check inside of lock. |
| try (Statement stmt = c.createStatement()) { |
| stmt.executeUpdate("CREATE TABLE " + table(idx) + |
| "(fake BOOL) ENGINE \"" + GridThreadLocalTable.Engine.class.getName() + '"'); |
| } |
| catch (SQLException e) { |
| throw new IllegalStateException(e); |
| } |
| |
| List<GridThreadLocalTable> newTbls = new ArrayList<>(tbls.size() + 1); |
| |
| newTbls.addAll(tbls); |
| newTbls.add(GridThreadLocalTable.Engine.getCreated()); |
| |
| fakeTbls = tbls = newTbls; |
| } |
| } |
| finally { |
| fakeTblsLock.unlock(); |
| } |
| } |
| |
| return tbls.get(idx); |
| } |
| |
| /** |
| * Calculates data nodes for replicated caches on unstable topology. |
| * |
| * @param cctx Cache context for main space. |
| * @param extraSpaces Extra spaces. |
| * @return Collection of all data nodes owning all the caches or {@code null} for retry. |
| */ |
| private Collection<ClusterNode> replicatedUnstableDataNodes(final GridCacheContext<?,?> cctx, |
| List<String> extraSpaces) { |
| assert cctx.isReplicated() : cctx.name() + " must be replicated"; |
| |
| Set<ClusterNode> nodes = replicatedUnstableDataNodes(cctx); |
| |
| if (F.isEmpty(nodes)) |
| return null; // Retry. |
| |
| if (!F.isEmpty(extraSpaces)) { |
| for (String extraSpace : extraSpaces) { |
| GridCacheContext<?,?> extraCctx = cacheContext(extraSpace); |
| |
| if (extraCctx.isLocal()) |
| continue; |
| |
| if (!extraCctx.isReplicated()) |
| throw new CacheException("Queries running on replicated cache should not contain JOINs " + |
| "with tables in partitioned caches [rCache=" + cctx.name() + ", pCache=" + extraSpace + "]"); |
| |
| Set<ClusterNode> extraOwners = replicatedUnstableDataNodes(extraCctx); |
| |
| if (F.isEmpty(extraOwners)) |
| return null; // Retry. |
| |
| nodes.retainAll(extraOwners); |
| |
| if (nodes.isEmpty()) |
| return null; // Retry. |
| } |
| } |
| |
| return nodes; |
| } |
| |
| /** |
| * @param space Cache name. |
| * @param topVer Topology version. |
| * @return Collection of data nodes. |
| */ |
| private Collection<ClusterNode> dataNodes(String space, AffinityTopologyVersion topVer) { |
| Collection<ClusterNode> res = ctx.discovery().cacheAffinityNodes(space, topVer); |
| |
| return res != null ? res : Collections.<ClusterNode>emptySet(); |
| } |
| |
| /** |
| * Collects all the nodes owning all the partitions for the given replicated cache. |
| * |
| * @param cctx Cache context. |
| * @return Owning nodes or {@code null} if we can't find owners for some partitions. |
| */ |
| private Set<ClusterNode> replicatedUnstableDataNodes(GridCacheContext<?,?> cctx) { |
| assert cctx.isReplicated() : cctx.name() + " must be replicated"; |
| |
| String space = cctx.name(); |
| |
| Set<ClusterNode> dataNodes = new HashSet<>(dataNodes(space, NONE)); |
| |
| if (dataNodes.isEmpty()) |
| throw new CacheException("Failed to find data nodes for cache: " + space); |
| |
| // Find all the nodes owning all the partitions for replicated cache. |
| for (int p = 0, parts = cctx.affinity().partitions(); p < parts; p++) { |
| List<ClusterNode> owners = cctx.topology().owners(p); |
| |
| if (F.isEmpty(owners)) |
| return null; // Retry. |
| |
| dataNodes.retainAll(owners); |
| |
| if (dataNodes.isEmpty()) |
| return null; // Retry. |
| } |
| |
| return dataNodes; |
| } |
| |
| /** |
| * Calculates partition mapping for partitioned cache on unstable topology. |
| * |
| * @param cctx Cache context for main space. |
| * @param extraSpaces Extra spaces. |
| * @return Partition mapping or {@code null} if we can't calculate it due to repartitioning and we need to retry. |
| */ |
| @SuppressWarnings("unchecked") |
| private Map<ClusterNode, IntArray> partitionedUnstableDataNodes(final GridCacheContext<?,?> cctx, |
| List<String> extraSpaces) { |
| assert !cctx.isReplicated() && !cctx.isLocal() : cctx.name() + " must be partitioned"; |
| |
| final int partsCnt = cctx.affinity().partitions(); |
| |
| if (extraSpaces != null) { // Check correct number of partitions for partitioned caches. |
| for (String extraSpace : extraSpaces) { |
| GridCacheContext<?,?> extraCctx = cacheContext(extraSpace); |
| |
| if (extraCctx.isReplicated() || extraCctx.isLocal()) |
| continue; |
| |
| int parts = extraCctx.affinity().partitions(); |
| |
| if (parts != partsCnt) |
| throw new CacheException("Number of partitions must be the same for correct collocation [cache1=" + |
| cctx.name() + ", parts1=" + partsCnt + ", cache2=" + extraSpace + ", parts2=" + parts + "]"); |
| } |
| } |
| |
| Set<ClusterNode>[] partLocs = new Set[partsCnt]; |
| |
| // Fill partition locations for main cache. |
| for (int p = 0, parts = cctx.affinity().partitions(); p < parts; p++) { |
| List<ClusterNode> owners = cctx.topology().owners(p); |
| |
| if (F.isEmpty(owners)) { |
| if (!F.isEmpty(dataNodes(cctx.name(), NONE))) |
| return null; // Retry. |
| |
| throw new CacheException("Failed to find data nodes [cache=" + cctx.name() + ", part=" + p + "]"); |
| } |
| |
| partLocs[p] = new HashSet<>(owners); |
| } |
| |
| if (extraSpaces != null) { |
| // Find owner intersections for each participating partitioned cache partition. |
| // We need this for logical collocation between different partitioned caches with the same affinity. |
| for (String extraSpace : extraSpaces) { |
| GridCacheContext<?,?> extraCctx = cacheContext(extraSpace); |
| |
| if (extraCctx.isReplicated() || extraCctx.isLocal()) |
| continue; |
| |
| for (int p = 0, parts = extraCctx.affinity().partitions(); p < parts; p++) { |
| List<ClusterNode> owners = extraCctx.topology().owners(p); |
| |
| if (F.isEmpty(owners)) { |
| if (!F.isEmpty(dataNodes(extraSpace, NONE))) |
| return null; // Retry. |
| |
| throw new CacheException("Failed to find data nodes [cache=" + extraSpace + ", part=" + p + "]"); |
| } |
| |
| if (partLocs[p] == null) |
| partLocs[p] = new HashSet<>(owners); |
| else { |
| partLocs[p].retainAll(owners); // Intersection of owners. |
| |
| if (partLocs[p].isEmpty()) |
| return null; // Intersection is empty -> retry. |
| } |
| } |
| } |
| |
| // Filter nodes where not all the replicated caches loaded. |
| for (String extraSpace : extraSpaces) { |
| GridCacheContext<?,?> extraCctx = cacheContext(extraSpace); |
| |
| if (!extraCctx.isReplicated()) |
| continue; |
| |
| Set<ClusterNode> dataNodes = replicatedUnstableDataNodes(extraCctx); |
| |
| if (F.isEmpty(dataNodes)) |
| return null; // Retry. |
| |
| for (Set<ClusterNode> partLoc : partLocs) { |
| partLoc.retainAll(dataNodes); |
| |
| if (partLoc.isEmpty()) |
| return null; // Retry. |
| } |
| } |
| } |
| |
| // Collect the final partitions mapping. |
| Map<ClusterNode, IntArray> res = new HashMap<>(); |
| |
| // Here partitions in all IntArray's will be sorted in ascending order, this is important. |
| for (int p = 0; p < partLocs.length; p++) { |
| Set<ClusterNode> pl = partLocs[p]; |
| |
| assert !F.isEmpty(pl) : pl; |
| |
| ClusterNode n = pl.size() == 1 ? F.first(pl) : F.rand(pl); |
| |
| IntArray parts = res.get(n); |
| |
| if (parts == null) |
| res.put(n, parts = new IntArray()); |
| |
| parts.add(p); |
| } |
| |
| return res; |
| } |
| |
| /** |
| * @param mainSpace Main space. |
| * @param allSpaces All spaces. |
| * @return List of all extra spaces or {@code null} if none. |
| */ |
| private List<String> extraSpaces(String mainSpace, Set<String> allSpaces) { |
| if (F.isEmpty(allSpaces) || (allSpaces.size() == 1 && allSpaces.contains(mainSpace))) |
| return null; |
| |
| ArrayList<String> res = new ArrayList<>(allSpaces.size()); |
| |
| for (String space : allSpaces) { |
| if (!F.eq(space, mainSpace)) |
| res.add(space); |
| } |
| |
| return res; |
| } |
| |
| /** |
| * @param c Connection. |
| * @param space Space. |
| * @param qry Query. |
| * @return Cursor for plans. |
| * @throws IgniteCheckedException if failed. |
| */ |
| private Iterator<List<?>> explainPlan(JdbcConnection c, String space, GridCacheTwoStepQuery qry) |
| throws IgniteCheckedException { |
| List<List<?>> lists = new ArrayList<>(); |
| |
| for (int i = 0, mapQrys = qry.mapQueries().size(); i < mapQrys; i++) { |
| ResultSet rs = h2.executeSqlQueryWithTimer(space, c, "SELECT PLAN FROM " + table(i), null, false); |
| |
| lists.add(F.asList(getPlan(rs))); |
| } |
| |
| int tblIdx = 0; |
| |
| for (GridCacheSqlQuery mapQry : qry.mapQueries()) { |
| GridMergeTable tbl = createMergeTable(c, mapQry, false); |
| |
| fakeTable(c, tblIdx++).setInnerTable(tbl); |
| } |
| |
| GridCacheSqlQuery rdc = qry.reduceQuery(); |
| |
| ResultSet rs = h2.executeSqlQueryWithTimer(space, |
| c, |
| "EXPLAIN " + rdc.query(), |
| F.asList(rdc.parameters()), |
| false); |
| |
| lists.add(F.asList(getPlan(rs))); |
| |
| return lists.iterator(); |
| } |
| |
| /** |
| * @param rs Result set. |
| * @return Plan. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private String getPlan(ResultSet rs) throws IgniteCheckedException { |
| try { |
| if (!rs.next()) |
| throw new IllegalStateException(); |
| |
| return rs.getString(1); |
| } |
| catch (SQLException e) { |
| throw new IgniteCheckedException(e); |
| } |
| } |
| |
| /** |
| * @param nodes Nodes. |
| * @param msg Message. |
| * @param partsMap Partitions. |
| * @return {@code true} If all messages sent successfully. |
| */ |
| private boolean send( |
| Collection<ClusterNode> nodes, |
| Message msg, |
| Map<ClusterNode,IntArray> partsMap |
| ) { |
| boolean locNodeFound = false; |
| |
| boolean ok = true; |
| |
| for (ClusterNode node : nodes) { |
| if (node.isLocal()) { |
| locNodeFound = true; |
| |
| continue; |
| } |
| |
| try { |
| ctx.io().send(node, GridTopic.TOPIC_QUERY, copy(msg, node, partsMap), QUERY_POOL); |
| } |
| catch (IgniteCheckedException e) { |
| ok = false; |
| |
| U.warn(log, e.getMessage()); |
| } |
| } |
| |
| if (locNodeFound) // Local node goes the last to allow parallel execution. |
| h2.mapQueryExecutor().onMessage(ctx.localNodeId(), copy(msg, ctx.discovery().localNode(), partsMap)); |
| |
| return ok; |
| } |
| |
| /** |
| * @param msg Message to copy. |
| * @param node Node. |
| * @param partsMap Partitions map. |
| * @return Copy of message with partitions set. |
| */ |
| private Message copy(Message msg, ClusterNode node, Map<ClusterNode,IntArray> partsMap) { |
| if (partsMap == null) |
| return msg; |
| |
| GridQueryRequest res = new GridQueryRequest((GridQueryRequest)msg); |
| |
| IntArray parts = partsMap.get(node); |
| |
| assert parts != null : node; |
| |
| int[] partsArr = new int[parts.size()]; |
| |
| parts.toArray(partsArr); |
| |
| res.partitions(partsArr); |
| |
| return res; |
| } |
| |
| /** |
| * @param conn Connection. |
| * @param qry Query. |
| * @param explain Explain. |
| * @return Table. |
| * @throws IgniteCheckedException |
| */ |
| private GridMergeTable createMergeTable(JdbcConnection conn, GridCacheSqlQuery qry, boolean explain) |
| throws IgniteCheckedException { |
| try { |
| Session ses = (Session)conn.getSession(); |
| |
| CreateTableData data = new CreateTableData(); |
| |
| data.tableName = "T___"; |
| data.schema = ses.getDatabase().getSchema(ses.getCurrentSchemaName()); |
| data.create = true; |
| |
| if (!explain) { |
| LinkedHashMap<String,?> colsMap = qry.columns(); |
| |
| assert colsMap != null; |
| |
| ArrayList<Column> cols = new ArrayList<>(colsMap.size()); |
| |
| for (Map.Entry<String,?> e : colsMap.entrySet()) { |
| String alias = e.getKey(); |
| GridSqlType t = (GridSqlType)e.getValue(); |
| |
| assert !F.isEmpty(alias); |
| |
| Column c = new Column(alias, t.type(), t.precision(), t.scale(), t.displaySize()); |
| |
| cols.add(c); |
| } |
| |
| data.columns = cols; |
| } |
| else |
| data.columns = planColumns(); |
| |
| return new GridMergeTable(data, ctx); |
| } |
| catch (Exception e) { |
| U.closeQuiet(conn); |
| |
| throw new IgniteCheckedException(e); |
| } |
| } |
| |
| /** |
| * @return Columns. |
| */ |
| private static ArrayList<Column> planColumns() { |
| ArrayList<Column> res = new ArrayList<>(1); |
| |
| res.add(new Column("PLAN", Value.STRING)); |
| |
| return res; |
| } |
| |
| /** |
| * @param reconnectFut Reconnect future. |
| */ |
| public void onDisconnected(IgniteFuture<?> reconnectFut) { |
| CacheException err = new CacheException("Query was cancelled, client node disconnected.", |
| new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected.")); |
| |
| for (Map.Entry<Long, QueryRun> e : runs.entrySet()) |
| e.getValue().disconnected(err); |
| } |
| |
| /** |
| * |
| */ |
| private static class QueryRun { |
| /** */ |
| private List<GridMergeIndex> idxs; |
| |
| /** */ |
| private CountDownLatch latch; |
| |
| /** */ |
| private JdbcConnection conn; |
| |
| /** */ |
| private int pageSize; |
| |
| /** Can be either CacheException in case of error or AffinityTopologyVersion to retry if needed. */ |
| private final AtomicReference<Object> state = new AtomicReference<>(); |
| |
| /** |
| * @param o Fail state object. |
| * @param nodeId Node ID. |
| */ |
| void state(Object o, @Nullable UUID nodeId) { |
| assert o != null; |
| assert o instanceof CacheException || o instanceof AffinityTopologyVersion : o.getClass(); |
| |
| if (!state.compareAndSet(null, o)) |
| return; |
| |
| while (latch.getCount() != 0) // We don't need to wait for all nodes to reply. |
| latch.countDown(); |
| |
| for (GridMergeIndex idx : idxs) // Fail all merge indexes. |
| idx.fail(nodeId); |
| } |
| |
| /** |
| * @param e Error. |
| */ |
| void disconnected(CacheException e) { |
| if (!state.compareAndSet(null, e)) |
| return; |
| |
| while (latch.getCount() != 0) // We don't need to wait for all nodes to reply. |
| latch.countDown(); |
| |
| for (GridMergeIndex idx : idxs) // Fail all merge indexes. |
| idx.fail(e); |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class Iter extends GridH2ResultSetIterator<List<?>> { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** |
| * @param data Data array. |
| * @throws IgniteCheckedException If failed. |
| */ |
| protected Iter(ResultSet data) throws IgniteCheckedException { |
| super(data, true); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected List<?> createRow() { |
| ArrayList<Object> res = new ArrayList<>(row.length); |
| |
| Collections.addAll(res, row); |
| |
| return res; |
| } |
| } |
| } |