Merge branch 'master' into ignite-11213
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 71a704c..08762a9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -2192,6 +2192,15 @@
}
/**
+ * For testing only.
+ *
+ * @return Current version to wait for.
+ */
+ public AffinityTopologyVersion mergeExchangesTestWaitVersion() {
+ return exchMergeTestWaitVer;
+ }
+
+ /**
* @param curFut Current exchange future.
* @param msg Message.
* @return {@code True} if node is stopping.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index f4043ab..170d7e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -4390,6 +4390,7 @@
try {
boolean crdChanged = false;
boolean allReceived = false;
+ boolean wasMerged = false;
ClusterNode crd0;
@@ -4405,8 +4406,7 @@
newCrdFut0.onNodeLeft(node.id());
synchronized (mux) {
- if (!srvNodes.remove(node))
- return;
+ srvNodes.remove(node);
boolean rmvd = remaining.remove(node.id());
@@ -4415,6 +4415,7 @@
if (mergedJoinExchMsgs.get(node.id()) == null) {
mergedJoinExchMsgs.remove(node.id());
+ wasMerged = true;
rmvd = true;
}
}
@@ -4513,11 +4514,16 @@
}
if (allReceived) {
+ boolean wasMerged0 = wasMerged;
+
cctx.kernalContext().getSystemExecutorService().submit(new Runnable() {
@Override public void run() {
awaitSingleMapUpdates();
- onAllReceived(null);
+ if (wasMerged0)
+ finishExchangeOnCoordinator(null);
+ else
+ onAllReceived(null);
}
});
}
@@ -4866,14 +4872,17 @@
/** {@inheritDoc} */
@Override public String toString() {
Set<UUID> remaining;
+ Set<UUID> mergedJoinExch;
synchronized (mux) {
remaining = new HashSet<>(this.remaining);
+ mergedJoinExch = mergedJoinExchMsgs == null ? null : new HashSet<>(mergedJoinExchMsgs.keySet());
}
return S.toString(GridDhtPartitionsExchangeFuture.class, this,
"evtLatch", evtLatch == null ? "null" : evtLatch.getCount(),
"remaining", remaining,
+ "mergedJoinExchMsgs", mergedJoinExch,
"super", super.toString());
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/ExchangeMergeStaleServerNodesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/ExchangeMergeStaleServerNodesTest.java
new file mode 100644
index 0000000..0a59f2e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/ExchangeMergeStaleServerNodesTest.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.CommunicationSpi;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class ExchangeMergeStaleServerNodesTest extends GridCommonAbstractTest {
+ /** */
+ private Map<String, DelayableCommunicationSpi> commSpis;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ CommunicationSpi commSpi = commSpis == null ? null : commSpis.get(igniteInstanceName);
+
+ if (commSpi != null)
+ cfg.setCommunicationSpi(commSpi);
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testServersFailAfterMerge() throws Exception {
+ DelayableCommunicationSpi delaySpi1 = new DelayableCommunicationSpi((msg) -> {
+ if (msg instanceof GridDhtPartitionsSingleMessage) {
+ GridDhtPartitionsSingleMessage singleMsg = (GridDhtPartitionsSingleMessage)msg;
+
+ return singleMsg.exchangeId() != null && singleMsg.exchangeId().topologyVersion().equals(new AffinityTopologyVersion(2, 0));
+ }
+
+ return false;
+ });
+
+ commSpis = F.asMap(
+ getTestIgniteInstanceName(0), new DelayableCommunicationSpi((msg) -> false),
+ getTestIgniteInstanceName(1), delaySpi1,
+ getTestIgniteInstanceName(2), new DelayableCommunicationSpi((msg) -> msg instanceof GridDhtPartitionsSingleMessage),
+ getTestIgniteInstanceName(3), new DelayableCommunicationSpi((msg) -> false)
+ );
+
+ try {
+ IgniteEx crd = startGrid(0);
+
+ GridCachePartitionExchangeManager<Object, Object> exchMgr = crd.context().cache().context().exchange();
+
+ exchMgr.mergeExchangesTestWaitVersion(new AffinityTopologyVersion(3, 0), null);
+
+ // Single message for this node is blocked until further notice.
+ IgniteInternalFuture<IgniteEx> fut = GridTestUtils.runAsync(() -> startGrid(1), "starter1");
+
+ GridTestUtils.waitForCondition(() -> exchMgr.lastTopologyFuture().exchangeId().topologyVersion()
+ .equals(new AffinityTopologyVersion(2, 0)), getTestTimeout());
+
+ IgniteInternalFuture<IgniteEx> futFail = GridTestUtils.runAsync(() -> startGrid(2), "starter2");
+
+ GridTestUtils.waitForCondition(exchMgr::hasPendingExchange, getTestTimeout());
+
+ // Unblock message to proceed merging.
+ delaySpi1.replay(crd.cluster().localNode().id());
+
+ // Wait for merged exchange.
+ GridTestUtils.waitForCondition(
+ () -> exchMgr.mergeExchangesTestWaitVersion() == null, getTestTimeout());
+
+ futFail.cancel();
+ stopGrid(getTestIgniteInstanceName(2), true);
+
+ fut.get();
+
+ try {
+ futFail.get();
+ }
+ catch (IgniteCheckedException ignore) {
+ // No-op.
+ }
+
+ // Check that next nodes can successfully join topology.
+ startGrid(3);
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ *
+ */
+ private static class DelayableCommunicationSpi extends TcpCommunicationSpi {
+ /** */
+ private ConcurrentMap<UUID, Collection<Runnable>> delayed = new ConcurrentHashMap<>();
+
+ /** */
+ private IgnitePredicate<Message> delayPred;
+
+ /**
+ * @param delayPred Delay predicate.
+ */
+ private DelayableCommunicationSpi(IgnitePredicate<Message> delayPred) {
+ this.delayPred = delayPred;
+ }
+
+ /**
+ * @param nodeId Node ID to replay.
+ */
+ private void replay(UUID nodeId) {
+ Collection<Runnable> old = delayed.replace(nodeId, new ConcurrentLinkedDeque<>());
+
+ if (old != null) {
+ for (Runnable task : old)
+ task.run();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
+ final Message msg0 = ((GridIoMessage)msg).message();
+
+ if (delayPred.apply(msg0)) {
+ delayed.computeIfAbsent(
+ node.id(),
+ (nodeId) -> new ConcurrentLinkedDeque<>()
+ ).add(new Runnable() {
+ @Override public void run() {
+ DelayableCommunicationSpi.super.sendMessage(node, msg, ackC);
+ }
+ });
+
+ log.info("Delayed message: " + msg0);
+ }
+ else {
+ try {
+ super.sendMessage(node, msg, ackC);
+ }
+ catch (Exception e) {
+ U.log(null, e);
+ }
+ }
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite6.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite6.java
index 707244d..471b437 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite6.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite6.java
@@ -34,6 +34,7 @@
import org.apache.ignite.internal.processors.cache.datastructures.IgniteExchangeLatchManagerCoordinatorFailTest;
import org.apache.ignite.internal.processors.cache.distributed.CacheExchangeMergeTest;
import org.apache.ignite.internal.processors.cache.distributed.CacheParallelStartTest;
+import org.apache.ignite.internal.processors.cache.distributed.ExchangeMergeStaleServerNodesTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteCache150ClientsTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteOptimisticTxSuspendResumeTest;
import org.apache.ignite.internal.processors.cache.transactions.TxOptimisticOnPartitionExchangeTest;
@@ -74,6 +75,7 @@
// Other non-tx tests.
ignoredTests.add(CacheExchangeMergeTest.class);
+ ignoredTests.add(ExchangeMergeStaleServerNodesTest.class);
ignoredTests.add(IgniteExchangeLatchManagerCoordinatorFailTest.class);
ignoredTests.add(PartitionsExchangeCoordinatorFailoverTest.class);
ignoredTests.add(CacheParallelStartTest.class);
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
index 2388cfb..444152d 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
@@ -32,6 +32,7 @@
import org.apache.ignite.internal.processors.cache.distributed.CacheExchangeMergeTest;
import org.apache.ignite.internal.processors.cache.distributed.CacheParallelStartTest;
import org.apache.ignite.internal.processors.cache.distributed.CacheTryLockMultithreadedTest;
+import org.apache.ignite.internal.processors.cache.distributed.ExchangeMergeStaleServerNodesTest;
import org.apache.ignite.internal.processors.cache.distributed.GridCachePartitionEvictionDuringReadThroughSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteCache150ClientsTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheThreadLocalTxTest;
@@ -79,6 +80,7 @@
GridTestUtils.addTestIfNeeded(suite, IgnitePessimisticTxSuspendResumeTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, CacheExchangeMergeTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, ExchangeMergeStaleServerNodesTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, TxRollbackOnTimeoutTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, TxRollbackOnTimeoutNoDeadlockDetectionTest.class, ignoredTests);
diff --git a/modules/hibernate-4.2/pom.xml b/modules/hibernate-4.2/pom.xml
index bd5f50aa..a1c92af 100644
--- a/modules/hibernate-4.2/pom.xml
+++ b/modules/hibernate-4.2/pom.xml
@@ -63,7 +63,7 @@
<dependency>
<groupId>org.ow2.jotm</groupId>
<artifactId>jotm-core</artifactId>
- <version>2.1.9</version>
+ <version>${jotm.version}</version>
<scope>test</scope>
</dependency>
@@ -136,14 +136,14 @@
<dependency>
<groupId>org.jboss.spec.javax.rmi</groupId>
<artifactId>jboss-rmi-api_1.0_spec</artifactId>
- <version>1.0.6.Final</version>
+ <version>${jboss.rmi.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
- <version>2.1</version>
+ <version>${jaxb.api.version}</version>
<scope>test</scope>
</dependency>
diff --git a/modules/hibernate-5.1/pom.xml b/modules/hibernate-5.1/pom.xml
index 6df74b2..6c0b950 100644
--- a/modules/hibernate-5.1/pom.xml
+++ b/modules/hibernate-5.1/pom.xml
@@ -63,7 +63,7 @@
<dependency>
<groupId>org.ow2.jotm</groupId>
<artifactId>jotm-core</artifactId>
- <version>2.1.9</version>
+ <version>${jotm.version}</version>
<scope>test</scope>
</dependency>
@@ -136,28 +136,28 @@
<dependency>
<groupId>org.jboss.spec.javax.rmi</groupId>
<artifactId>jboss-rmi-api_1.0_spec</artifactId>
- <version>1.0.6.Final</version>
+ <version>${jboss.rmi.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
- <version>2.1</version>
+ <version>${jaxb.api.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.sun.xml.bind</groupId>
<artifactId>jaxb-core</artifactId>
- <version>2.1.14</version>
+ <version>${jaxb.impl.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.sun.xml.bind</groupId>
<artifactId>jaxb-impl</artifactId>
- <version>2.1.14</version>
+ <version>${jaxb.impl.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
diff --git a/modules/hibernate-5.3/pom.xml b/modules/hibernate-5.3/pom.xml
index 7003c7b..c6c846b 100644
--- a/modules/hibernate-5.3/pom.xml
+++ b/modules/hibernate-5.3/pom.xml
@@ -63,7 +63,7 @@
<dependency>
<groupId>org.ow2.jotm</groupId>
<artifactId>jotm-core</artifactId>
- <version>2.1.9</version>
+ <version>${jotm.version}</version>
<scope>test</scope>
</dependency>
@@ -130,6 +130,34 @@
<version>1.4.8</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>javax.xml.bind</groupId>
+ <artifactId>jaxb-api</artifactId>
+ <version>${jaxb.api.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.sun.xml.bind</groupId>
+ <artifactId>jaxb-core</artifactId>
+ <version>${jaxb.impl.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.sun.xml.bind</groupId>
+ <artifactId>jaxb-impl</artifactId>
+ <version>${jaxb.impl.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.jboss.spec.javax.rmi</groupId>
+ <artifactId>jboss-rmi-api_1.0_spec</artifactId>
+ <version>${jboss.rmi.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<profiles>
diff --git a/modules/ignored-tests/pom.xml b/modules/ignored-tests/pom.xml
index 2684495..b9b4823 100644
--- a/modules/ignored-tests/pom.xml
+++ b/modules/ignored-tests/pom.xml
@@ -227,7 +227,7 @@
<dependency>
<groupId>org.ow2.jotm</groupId>
<artifactId>jotm-core</artifactId>
- <version>2.1.9</version>
+ <version>${jotm.version}</version>
<scope>test</scope>
</dependency>
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java
index a3d865c..53e2af1 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java
@@ -48,6 +48,7 @@
import org.apache.ignite.internal.stat.IoStatisticsHolder;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.h2.result.SearchRow;
import org.h2.table.IndexColumn;
import org.h2.value.Value;
import org.jetbrains.annotations.Nullable;
@@ -396,7 +397,7 @@
inlineSizeRecomendation(row);
- org.h2.result.SearchRow rowData = getRow(io, pageAddr, idx);
+ SearchRow rowData = getRow(io, pageAddr, idx);
for (int i = lastIdxUsed, len = cols.length; i < len; i++) {
IndexColumn col = cols[i];
@@ -500,7 +501,7 @@
* @param row Grid H2 row related to given inline indexes.
*/
@SuppressWarnings({"ConditionalBreakInInfiniteLoop", "IfMayBeConditional"})
- private void inlineSizeRecomendation(org.h2.result.SearchRow row) {
+ private void inlineSizeRecomendation(SearchRow row) {
//Do the check only for put operations.
if(!(row instanceof H2CacheRow))
return;
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/DistributedUpdateRun.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlDistributedUpdateRun.java
similarity index 90%
rename from modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/DistributedUpdateRun.java
rename to modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlDistributedUpdateRun.java
index 9e7b9ae..9982141 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/DistributedUpdateRun.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlDistributedUpdateRun.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.h2.twostep;
+package org.apache.ignite.internal.processors.query.h2.dml;
import java.util.Arrays;
import java.util.HashSet;
@@ -31,7 +31,7 @@
/**
* Context for DML operation on reducer node.
*/
-class DistributedUpdateRun {
+public class DmlDistributedUpdateRun {
/** Expected number of responses. */
private final int nodeCount;
@@ -52,7 +52,7 @@
*
* @param nodeCount Number of nodes to await results from.
*/
- DistributedUpdateRun(int nodeCount) {
+ public DmlDistributedUpdateRun(int nodeCount) {
this.nodeCount = nodeCount;
rspNodes = new HashSet<>(nodeCount);
@@ -61,7 +61,7 @@
/**
* @return Result future.
*/
- GridFutureAdapter<UpdateResult> future() {
+ public GridFutureAdapter<UpdateResult> future() {
return fut;
}
@@ -69,7 +69,7 @@
* Handle disconnection.
* @param e Pre-formatted error.
*/
- void handleDisconnect(CacheException e) {
+ public void handleDisconnect(CacheException e) {
fut.onDone(new IgniteCheckedException("Update failed because client node have disconnected.", e));
}
@@ -78,7 +78,7 @@
*
* @param nodeId Node id.
*/
- void handleNodeLeft(UUID nodeId) {
+ public void handleNodeLeft(UUID nodeId) {
fut.onDone(new IgniteCheckedException("Update failed because map node left topology [nodeId=" + nodeId + "]"));
}
@@ -88,7 +88,7 @@
* @param id Node id.
* @param msg Response message.
*/
- void handleResponse(UUID id, GridH2DmlResponse msg) {
+ public void handleResponse(UUID id, GridH2DmlResponse msg) {
synchronized (this) {
if (!rspNodes.add(id))
return; // ignore duplicated messages
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2ProxyIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2ProxyIndex.java
index 95a253b..4897bd4 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2ProxyIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2ProxyIndex.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.query.h2.opt;
+import org.apache.ignite.internal.processors.query.h2.opt.join.ProxyDistributedLookupBatch;
import org.h2.engine.Session;
import org.h2.index.BaseIndex;
import org.h2.index.Cursor;
@@ -34,7 +35,6 @@
import java.util.HashSet;
import java.util.List;
-import java.util.concurrent.Future;
/**
* Allows to have 'free' index for alias columns
@@ -154,7 +154,12 @@
@Override public IndexLookupBatch createLookupBatch(TableFilter[] filters, int filter) {
IndexLookupBatch batch = idx.createLookupBatch(filters, filter);
- return batch != null ? new ProxyIndexLookupBatch(batch) : null;
+ if (batch == null)
+ return null;
+
+ GridH2RowDescriptor rowDesc = ((GridH2Table)idx.getTable()).rowDescriptor();
+
+ return new ProxyDistributedLookupBatch(batch, rowDesc);
}
/** {@inheritDoc} */
@@ -162,45 +167,4 @@
// No-op. Will be removed when underlying index is removed
}
- /** Proxy lookup batch */
- private class ProxyIndexLookupBatch implements IndexLookupBatch {
-
- /** Underlying normal lookup batch */
- private final IndexLookupBatch target;
-
- /**
- * Creates proxy lookup batch.
- *
- * @param target Underlying index lookup batch.
- */
- private ProxyIndexLookupBatch(IndexLookupBatch target) {
- this.target = target;
- }
-
- /** {@inheritDoc} */
- @Override public boolean addSearchRows(SearchRow first, SearchRow last) {
- GridH2RowDescriptor desc = ((GridH2Table)idx.getTable()).rowDescriptor();
- return target.addSearchRows(desc.prepareProxyIndexRow(first), desc.prepareProxyIndexRow(last));
- }
-
- /** {@inheritDoc} */
- @Override public boolean isBatchFull() {
- return target.isBatchFull();
- }
-
- /** {@inheritDoc} */
- @Override public List<Future<Cursor>> find() {
- return target.find();
- }
-
- /** {@inheritDoc} */
- @Override public String getPlanSQL() {
- return target.getPlanSQL();
- }
-
- /** {@inheritDoc} */
- @Override public void reset(boolean beforeQuery) {
- target.reset(beforeQuery);
- }
- }
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
index 652c950..6a1f7f6 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
@@ -31,6 +31,7 @@
import org.apache.ignite.internal.processors.query.h2.H2TableDescriptor;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import org.h2.message.DbException;
+import org.h2.result.SearchRow;
import org.h2.value.DataType;
import org.h2.value.Value;
import org.jetbrains.annotations.Nullable;
@@ -349,7 +350,7 @@
* @param row Source row.
* @return Result.
*/
- public org.h2.result.SearchRow prepareProxyIndexRow(org.h2.result.SearchRow row) {
+ public SearchRow prepareProxyIndexRow(SearchRow row) {
if (row == null)
return null;
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/ProxyDistributedLookupBatch.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/ProxyDistributedLookupBatch.java
new file mode 100644
index 0000000..652bd4b
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/ProxyDistributedLookupBatch.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.opt.join;
+
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
+import org.h2.index.Cursor;
+import org.h2.index.IndexLookupBatch;
+import org.h2.result.SearchRow;
+
+import java.util.List;
+import java.util.concurrent.Future;
+
+/**
+ * Lookip batch for proxy indexes.
+ */
+public class ProxyDistributedLookupBatch implements IndexLookupBatch {
+ /** Underlying normal lookup batch */
+ private final IndexLookupBatch delegate;
+
+ /** Row descriptor. */
+ private final GridH2RowDescriptor rowDesc;
+
+ /**
+ * Creates proxy lookup batch.
+ *
+ * @param delegate Underlying index lookup batch.
+ * @param rowDesc Row descriptor.
+ */
+ public ProxyDistributedLookupBatch(IndexLookupBatch delegate, GridH2RowDescriptor rowDesc) {
+ this.delegate = delegate;
+ this.rowDesc = rowDesc;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean addSearchRows(SearchRow first, SearchRow last) {
+ SearchRow firstProxy = rowDesc.prepareProxyIndexRow(first);
+ SearchRow lastProxy = rowDesc.prepareProxyIndexRow(last);
+
+ return delegate.addSearchRows(firstProxy, lastProxy);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isBatchFull() {
+ return delegate.isBatchFull();
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<Future<Cursor>> find() {
+ return delegate.find();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getPlanSQL() {
+ return delegate.getPlanSQL();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void reset(boolean beforeQuery) {
+ delegate.reset(beforeQuery);
+ }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 5ae2806..3ff6dfe 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -20,7 +20,6 @@
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
-import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -70,6 +69,7 @@
import org.apache.ignite.internal.processors.query.h2.H2Utils;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import org.apache.ignite.internal.processors.query.h2.UpdateResult;
+import org.apache.ignite.internal.processors.query.h2.dml.DmlDistributedUpdateRun;
import org.apache.ignite.internal.processors.query.h2.opt.QueryContext;
import org.apache.ignite.internal.processors.query.h2.opt.QueryContextRegistry;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlSortColumn;
@@ -141,10 +141,10 @@
private final ConcurrentMap<Long, ReduceQueryRun> runs = new ConcurrentHashMap<>();
/** Contexts of running DML requests. */
- private final ConcurrentMap<Long, DistributedUpdateRun> updRuns = new ConcurrentHashMap<>();
+ private final ConcurrentMap<Long, DmlDistributedUpdateRun> updRuns = new ConcurrentHashMap<>();
/** */
- private volatile List<GridThreadLocalTable> fakeTbls = Collections.emptyList();
+ private volatile List<ReduceTableWrapper> fakeTbls = Collections.emptyList();
/** */
private final Lock fakeTblsLock = new ReentrantLock();
@@ -207,7 +207,7 @@
UUID nodeId = ((DiscoveryEvent)evt).eventNode().id();
for (ReduceQueryRun r : runs.values()) {
- for (GridMergeIndex idx : r.indexes()) {
+ for (ReduceIndex idx : r.indexes()) {
if (idx.hasSource(nodeId)) {
handleNodeLeft(r, nodeId);
@@ -216,7 +216,7 @@
}
}
- for (DistributedUpdateRun r : updRuns.values())
+ for (DmlDistributedUpdateRun r : updRuns.values())
r.handleNodeLeft(nodeId);
}
@@ -306,12 +306,12 @@
final int pageSize = r.pageSize();
- GridMergeIndex idx = r.indexes().get(msg.query());
+ ReduceIndex idx = r.indexes().get(msg.query());
- GridResultPage page;
+ ReduceResultPage page;
try {
- page = new GridResultPage(ctx, node.id(), msg) {
+ page = new ReduceResultPage(ctx, node.id(), msg) {
@Override public void fetchNextPage() {
if (r.hasErrorOrRetry()) {
if (r.exception() != null)
@@ -388,7 +388,7 @@
* @param dataPageScanEnabled If data page scan is enabled.
* @return Rows iterator.
*/
- @SuppressWarnings("BusyWait")
+ @SuppressWarnings({"BusyWait", "IfMayBeConditional"})
public Iterator<List<?>> query(
String schemaName,
final GridCacheTwoStepQuery qry,
@@ -559,17 +559,17 @@
final boolean skipMergeTbl = !qry.explain() && qry.skipMergeTable() || singlePartMode;
final int segmentsPerIndex = qry.explain() || isReplicatedOnly ? 1 :
- findFirstPartitioned(cacheIds).config().getQueryParallelism();
+ mapper.findFirstPartitioned(cacheIds).config().getQueryParallelism();
int replicatedQrysCnt = 0;
final Collection<ClusterNode> finalNodes = nodes;
for (GridCacheSqlQuery mapQry : mapQueries) {
- GridMergeIndex idx;
+ ReduceIndex idx;
if (!skipMergeTbl) {
- GridMergeTable tbl;
+ ReduceTable tbl;
try {
tbl = createMergeTable(r.connection(), mapQry, qry.explain());
@@ -583,7 +583,7 @@
fakeTable(r.connection(), tblIdx++).innerTable(tbl);
}
else
- idx = GridMergeIndexUnsorted.createDummy(ctx);
+ idx = ReduceIndexUnsorted.createDummy(ctx);
// If the query has only replicated tables, we have to run it on a single node only.
if (!mapQry.isPartitioned()) {
@@ -745,7 +745,7 @@
if (!retry) {
if (skipMergeTbl) {
- resIter = new GridMergeIndexIterator(this,
+ resIter = new ReduceIndexIterator(this,
finalNodes,
r,
qryReqId,
@@ -877,6 +877,7 @@
* @param cancel Cancel state.
* @return Update result, or {@code null} when some map node doesn't support distributed DML.
*/
+ @SuppressWarnings("IfMayBeConditional")
public UpdateResult update(
String schemaName,
List<Integer> cacheIds,
@@ -920,7 +921,7 @@
}
}
- final DistributedUpdateRun r = new DistributedUpdateRun(nodes.size());
+ final DmlDistributedUpdateRun r = new DmlDistributedUpdateRun(nodes.size());
int flags = enforceJoinOrder ? GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER : 0;
@@ -991,7 +992,7 @@
try {
long reqId = msg.requestId();
- DistributedUpdateRun r = updRuns.get(reqId);
+ DmlDistributedUpdateRun r = updRuns.get(reqId);
if (r == null) {
U.warn(log, "Unexpected dml response (will ignore). [localNodeId=" + ctx.localNodeId() + ", nodeId=" +
@@ -1009,24 +1010,6 @@
}
/**
- * @param cacheIds Cache IDs.
- * @return The first partitioned cache context.
- */
- private GridCacheContext<?,?> findFirstPartitioned(List<Integer> cacheIds) {
- for (int i = 0; i < cacheIds.size(); i++) {
- GridCacheContext<?, ?> cctx = cacheContext(cacheIds.get(i));
-
- if (i == 0 && cctx.isLocal())
- throw new CacheException("Cache is LOCAL: " + cctx.name());
-
- if (!cctx.isReplicated() && !cctx.isLocal())
- return cctx;
- }
-
- throw new IllegalStateException("Failed to find partitioned cache.");
- }
-
- /**
* Returns true if the exception is triggered by query cancel.
*
* @param e Exception.
@@ -1050,7 +1033,7 @@
if (distributedJoins)
send(nodes, new GridQueryCancelRequest(qryReqId), null, false);
else {
- for (GridMergeIndex idx : r.indexes()) {
+ for (ReduceIndex idx : r.indexes()) {
if (!idx.fetchedAll()) {
send(nodes, new GridQueryCancelRequest(qryReqId), null, false);
@@ -1096,8 +1079,8 @@
* @param idx Index of table.
* @return Table.
*/
- private GridThreadLocalTable fakeTable(Connection c, int idx) {
- List<GridThreadLocalTable> tbls = fakeTbls;
+ private ReduceTableWrapper fakeTable(Connection c, int idx) {
+ List<ReduceTableWrapper> tbls = fakeTbls;
assert tbls.size() >= idx;
@@ -1106,18 +1089,12 @@
try {
if ((tbls = fakeTbls).size() == idx) { // Double check inside of lock.
- try (Statement stmt = c.createStatement()) {
- stmt.executeUpdate("CREATE TABLE " + mergeTableIdentifier(idx) +
- "(fake BOOL) ENGINE \"" + GridThreadLocalTable.Engine.class.getName() + '"');
- }
- catch (SQLException e) {
- throw new IllegalStateException(e);
- }
+ ReduceTableWrapper tbl = ReduceTableEngine.create(c, idx);
- List<GridThreadLocalTable> newTbls = new ArrayList<>(tbls.size() + 1);
+ List<ReduceTableWrapper> newTbls = new ArrayList<>(tbls.size() + 1);
newTbls.addAll(tbls);
- newTbls.add(GridThreadLocalTable.Engine.getCreated());
+ newTbls.add(tbl);
fakeTbls = tbls = newTbls;
}
@@ -1151,7 +1128,7 @@
int tblIdx = 0;
for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
- GridMergeTable tbl = createMergeTable(c, mapQry, false);
+ ReduceTable tbl = createMergeTable(c, mapQry, false);
fakeTable(c, tblIdx++).innerTable(tbl);
}
@@ -1250,7 +1227,7 @@
* @throws IgniteCheckedException If failed.
*/
@SuppressWarnings("unchecked")
- private GridMergeTable createMergeTable(JdbcConnection conn, GridCacheSqlQuery qry, boolean explain)
+ private ReduceTable createMergeTable(JdbcConnection conn, GridCacheSqlQuery qry, boolean explain)
throws IgniteCheckedException {
try {
Session ses = (Session)conn.getSession();
@@ -1286,25 +1263,25 @@
boolean sortedIndex = !F.isEmpty(qry.sortColumns());
- GridMergeTable tbl = new GridMergeTable(data);
+ ReduceTable tbl = new ReduceTable(data);
ArrayList<Index> idxs = new ArrayList<>(2);
if (explain) {
- idxs.add(new GridMergeIndexUnsorted(ctx, tbl,
+ idxs.add(new ReduceIndexUnsorted(ctx, tbl,
sortedIndex ? MERGE_INDEX_SORTED : MERGE_INDEX_UNSORTED));
}
else if (sortedIndex) {
List<GridSqlSortColumn> sortCols = (List<GridSqlSortColumn>)qry.sortColumns();
- GridMergeIndexSorted sortedMergeIdx = new GridMergeIndexSorted(ctx, tbl, MERGE_INDEX_SORTED,
+ ReduceIndexSorted sortedMergeIdx = new ReduceIndexSorted(ctx, tbl, MERGE_INDEX_SORTED,
GridSqlSortColumn.toIndexColumns(tbl, sortCols));
- idxs.add(GridMergeTable.createScanIndex(sortedMergeIdx));
+ idxs.add(ReduceTable.createScanIndex(sortedMergeIdx));
idxs.add(sortedMergeIdx);
}
else
- idxs.add(new GridMergeIndexUnsorted(ctx, tbl, MERGE_INDEX_UNSORTED));
+ idxs.add(new ReduceIndexUnsorted(ctx, tbl, MERGE_INDEX_UNSORTED));
tbl.indexes(idxs);
@@ -1338,7 +1315,7 @@
for (Map.Entry<Long, ReduceQueryRun> e : runs.entrySet())
e.getValue().disconnected(err);
- for (DistributedUpdateRun r: updRuns.values())
+ for (DmlDistributedUpdateRun r: updRuns.values())
r.handleDisconnect(err);
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceBlockList.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceBlockList.java
new file mode 100644
index 0000000..87d4c14
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceBlockList.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.RandomAccess;
+
+/**
+ *
+ */
+public class ReduceBlockList<Z> extends AbstractList<Z> implements RandomAccess {
+ /** */
+ private final List<List<Z>> blocks;
+
+ /** */
+ private int size;
+
+ /** */
+ private final int maxBlockSize;
+
+ /** */
+ private final int shift;
+
+ /** */
+ private final int mask;
+
+ /**
+ * @param maxBlockSize Max block size.
+ */
+ public ReduceBlockList(int maxBlockSize) {
+ assert U.isPow2(maxBlockSize);
+
+ this.maxBlockSize = maxBlockSize;
+
+ shift = Integer.numberOfTrailingZeros(maxBlockSize);
+ mask = maxBlockSize - 1;
+
+ blocks = new ArrayList<>();
+ blocks.add(new ArrayList<Z>());
+ }
+
+ /** {@inheritDoc} */
+ @Override public int size() {
+ return size;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean add(Z z) {
+ size++;
+
+ List<Z> lastBlock = lastBlock();
+
+ lastBlock.add(z);
+
+ if (lastBlock.size() == maxBlockSize)
+ blocks.add(new ArrayList<Z>());
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Z get(int idx) {
+ return blocks.get(idx >>> shift).get(idx & mask);
+ }
+
+ /**
+ * @return Last block.
+ */
+ public List<Z> lastBlock() {
+ return ReduceIndex.last(blocks);
+ }
+
+ /**
+ * @return Evicted block.
+ */
+ public List<Z> evictFirstBlock() {
+ // Remove head block.
+ List<Z> res = blocks.remove(0);
+
+ size -= res.size();
+
+ return res;
+ }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndex.java
similarity index 80%
rename from modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
rename to modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndex.java
index 1c1cfaf..8266940 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndex.java
@@ -17,15 +17,12 @@
package org.apache.ignite.internal.processors.query.h2.twostep;
-import java.util.AbstractList;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
-import java.util.RandomAccess;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
@@ -58,7 +55,7 @@
/**
* Merge index.
*/
-public abstract class GridMergeIndex extends BaseIndex {
+public abstract class ReduceIndex extends BaseIndex {
/** */
private static final int MAX_FETCH_SIZE = getInteger(IGNITE_SQL_MERGE_TABLE_MAX_SIZE, 10_000);
@@ -66,8 +63,8 @@
private static final int PREFETCH_SIZE = getInteger(IGNITE_SQL_MERGE_TABLE_PREFETCH_SIZE, 1024);
/** */
- private static final AtomicReferenceFieldUpdater<GridMergeIndex, ConcurrentMap> lastPagesUpdater =
- AtomicReferenceFieldUpdater.newUpdater(GridMergeIndex.class, ConcurrentMap.class, "lastPages");
+ private static final AtomicReferenceFieldUpdater<ReduceIndex, ConcurrentMap> LAST_PAGES_UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(ReduceIndex.class, ConcurrentMap.class, "lastPages");
static {
if (!U.isPow2(PREFETCH_SIZE)) {
@@ -83,6 +80,7 @@
/** */
protected final Comparator<SearchRow> firstRowCmp = new Comparator<SearchRow>() {
+ @SuppressWarnings("ComparatorMethodParameterNotUsed")
@Override public int compare(SearchRow rowInList, SearchRow searchRow) {
int res = compareRows(rowInList, searchRow);
@@ -92,6 +90,7 @@
/** */
protected final Comparator<SearchRow> lastRowCmp = new Comparator<SearchRow>() {
+ @SuppressWarnings("ComparatorMethodParameterNotUsed")
@Override public int compare(SearchRow rowInList, SearchRow searchRow) {
int res = compareRows(rowInList, searchRow);
@@ -108,19 +107,17 @@
/**
* Will be r/w from query execution thread only, does not need to be threadsafe.
*/
- private final BlockList<Row> fetched;
+ private final ReduceBlockList<Row> fetched;
/** */
private Row lastEvictedRow;
/** */
- private volatile int fetchedCnt;
-
- /** */
private final GridKernalContext ctx;
- /** */
- private volatile ConcurrentMap<SourceKey, Integer> lastPages;
+ /** DO NOT change name field of this field, updated through {@link #LAST_PAGES_UPDATER} */
+ @SuppressWarnings("unused")
+ private volatile ConcurrentMap<ReduceSourceKey, Integer> lastPages;
/**
* @param ctx Context.
@@ -129,8 +126,8 @@
* @param type Type.
* @param cols Columns.
*/
- public GridMergeIndex(GridKernalContext ctx,
- GridMergeTable tbl,
+ protected ReduceIndex(GridKernalContext ctx,
+ ReduceTable tbl,
String name,
IndexType type,
IndexColumn[] cols
@@ -143,10 +140,10 @@
/**
* @param ctx Context.
*/
- protected GridMergeIndex(GridKernalContext ctx) {
+ protected ReduceIndex(GridKernalContext ctx) {
this.ctx = ctx;
- fetched = new BlockList<>(PREFETCH_SIZE);
+ fetched = new ReduceBlockList<>(PREFETCH_SIZE);
}
/**
@@ -222,8 +219,8 @@
* @param queue Queue to poll.
* @return Next page.
*/
- private GridResultPage takeNextPage(Pollable<GridResultPage> queue) {
- GridResultPage page;
+ private ReduceResultPage takeNextPage(Pollable<ReduceResultPage> queue) {
+ ReduceResultPage page;
for (;;) {
try {
@@ -247,9 +244,9 @@
* @param iter Current iterator.
* @return The same or new iterator.
*/
- protected final Iterator<Value[]> pollNextIterator(Pollable<GridResultPage> queue, Iterator<Value[]> iter) {
+ protected final Iterator<Value[]> pollNextIterator(Pollable<ReduceResultPage> queue, Iterator<Value[]> iter) {
if (!iter.hasNext()) {
- GridResultPage page = takeNextPage(queue);
+ ReduceResultPage page = takeNextPage(queue);
if (!page.isLast())
page.fetchNextPage(); // Failed will throw an exception here.
@@ -279,7 +276,7 @@
if (nodeId == null)
nodeId = F.first(sources);
- addPage0(new GridResultPage(null, nodeId, null) {
+ addPage0(new ReduceResultPage(null, nodeId, null) {
@Override public boolean isFail() {
return true;
}
@@ -307,9 +304,9 @@
if (allRows < 0 || res.page() != 0)
return;
- ConcurrentMap<SourceKey,Integer> lp = lastPages;
+ ConcurrentMap<ReduceSourceKey,Integer> lp = lastPages;
- if (lp == null && !lastPagesUpdater.compareAndSet(this, null, lp = new ConcurrentHashMap<>()))
+ if (lp == null && !LAST_PAGES_UPDATER.compareAndSet(this, null, lp = new ConcurrentHashMap<>()))
lp = lastPages;
assert pageSize > 0: pageSize;
@@ -318,14 +315,14 @@
assert lastPage >= 0: lastPage;
- if (lp.put(new SourceKey(nodeId, res.segmentId()), lastPage) != null)
+ if (lp.put(new ReduceSourceKey(nodeId, res.segmentId()), lastPage) != null)
throw new IllegalStateException();
}
/**
* @param page Page.
*/
- private void markLastPage(GridResultPage page) {
+ private void markLastPage(ReduceResultPage page) {
GridQueryNextPageResponse res = page.response();
if (!res.last()) {
@@ -333,12 +330,12 @@
initLastPages(nodeId, res);
- ConcurrentMap<SourceKey,Integer> lp = lastPages;
+ ConcurrentMap<ReduceSourceKey,Integer> lp = lastPages;
if (lp == null)
return; // It was not initialized --> wait for last page flag.
- Integer lastPage = lp.get(new SourceKey(nodeId, res.segmentId()));
+ Integer lastPage = lp.get(new ReduceSourceKey(nodeId, res.segmentId()));
if (lastPage == null)
return; // This node may use the new protocol --> wait for last page flag.
@@ -356,7 +353,7 @@
/**
* @param page Page.
*/
- public final void addPage(GridResultPage page) {
+ public final void addPage(ReduceResultPage page) {
markLastPage(page);
addPage0(page);
}
@@ -365,16 +362,16 @@
* @param lastPage Real last page.
* @return Created dummy page.
*/
- protected final GridResultPage createDummyLastPage(GridResultPage lastPage) {
+ protected final ReduceResultPage createDummyLastPage(ReduceResultPage lastPage) {
assert !lastPage.isDummyLast(); // It must be a real last page.
- return new GridResultPage(ctx, lastPage.source(), null).setLast(true);
+ return new ReduceResultPage(ctx, lastPage.source(), null).setLast(true);
}
/**
* @param page Page.
*/
- protected abstract void addPage0(GridResultPage page);
+ protected abstract void addPage0(ReduceResultPage page);
/** {@inheritDoc} */
@Override public final Cursor find(Session ses, SearchRow first, SearchRow last) {
@@ -512,7 +509,7 @@
* @param l List.
* @return Last element.
*/
- private static <Z> Z last(List<Z> l) {
+ public static <Z> Z last(List<Z> l) {
return l.get(l.size() - 1);
}
@@ -637,8 +634,6 @@
cur = Integer.MAX_VALUE; // We were not able to fetch anything. Done.
else {
// Update fetched count.
- fetchedCnt += rows.size() - cur;
-
if (haveBounds()) {
cur = findBounds();
@@ -676,88 +671,6 @@
}
}
- /** */
- enum State {
- UNINITIALIZED, INITIALIZED, FINISHED
- }
-
- /**
- */
- private static final class BlockList<Z> extends AbstractList<Z> implements RandomAccess {
- /** */
- private final List<List<Z>> blocks;
-
- /** */
- private int size;
-
- /** */
- private final int maxBlockSize;
-
- /** */
- private final int shift;
-
- /** */
- private final int mask;
-
- /**
- * @param maxBlockSize Max block size.
- */
- private BlockList(int maxBlockSize) {
- assert U.isPow2(maxBlockSize);
-
- this.maxBlockSize = maxBlockSize;
-
- shift = Integer.numberOfTrailingZeros(maxBlockSize);
- mask = maxBlockSize - 1;
-
- blocks = new ArrayList<>();
- blocks.add(new ArrayList<Z>());
- }
-
- /** {@inheritDoc} */
- @Override public int size() {
- return size;
- }
-
- /** {@inheritDoc} */
- @Override public boolean add(Z z) {
- size++;
-
- List<Z> lastBlock = lastBlock();
-
- lastBlock.add(z);
-
- if (lastBlock.size() == maxBlockSize)
- blocks.add(new ArrayList<Z>());
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public Z get(int idx) {
- return blocks.get(idx >>> shift).get(idx & mask);
- }
-
- /**
- * @return Last block.
- */
- private List<Z> lastBlock() {
- return last(blocks);
- }
-
- /**
- * @return Evicted block.
- */
- private List<Z> evictFirstBlock() {
- // Remove head block.
- List<Z> res = blocks.remove(0);
-
- size -= res.size();
-
- return res;
- }
- }
-
/**
* Pollable.
*/
@@ -771,39 +684,4 @@
E poll(long timeout, TimeUnit unit) throws InterruptedException;
}
- /**
- */
- private static class SourceKey {
- final UUID nodeId;
-
- /** */
- final int segment;
-
- /**
- * @param nodeId Node ID.
- * @param segment Segment.
- */
- SourceKey(UUID nodeId, int segment) {
- this.nodeId = nodeId;
- this.segment = segment;
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- SourceKey sourceKey = (SourceKey)o;
-
- if (segment != sourceKey.segment) return false;
- return nodeId.equals(sourceKey.nodeId);
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- int result = nodeId.hashCode();
- result = 31 * result + segment;
- return result;
- }
- }
}
\ No newline at end of file
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexIterator.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndexIterator.java
similarity index 90%
rename from modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexIterator.java
rename to modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndexIterator.java
index 851e1e4..3ff3a15 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexIterator.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndexIterator.java
@@ -22,7 +22,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
-import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
import org.h2.index.Cursor;
@@ -30,9 +29,9 @@
import org.jetbrains.annotations.Nullable;
/**
- * Iterator that transparently and sequentially traverses a bunch of {@link GridMergeIndex} objects.
+ * Iterator that transparently and sequentially traverses a bunch of {@link ReduceIndex} objects.
*/
-class GridMergeIndexIterator implements Iterator<List<?>>, AutoCloseable {
+public class ReduceIndexIterator implements Iterator<List<?>>, AutoCloseable {
/** Reduce query executor. */
private final GridReduceQueryExecutor rdcExec;
@@ -49,7 +48,7 @@
private final boolean distributedJoins;
/** Iterator over indexes. */
- private final Iterator<GridMergeIndex> idxIter;
+ private final Iterator<ReduceIndex> idxIter;
/** Current cursor. */
private Cursor cursor;
@@ -71,15 +70,14 @@
* @param run Query run.
* @param qryReqId Query request ID.
* @param distributedJoins Distributed joins.
- * @throws IgniteCheckedException if failed.
*/
- GridMergeIndexIterator(GridReduceQueryExecutor rdcExec,
+ public ReduceIndexIterator(GridReduceQueryExecutor rdcExec,
Collection<ClusterNode> nodes,
ReduceQueryRun run,
long qryReqId,
boolean distributedJoins,
- @Nullable MvccQueryTracker mvccTracker)
- throws IgniteCheckedException {
+ @Nullable MvccQueryTracker mvccTracker
+ ) {
this.rdcExec = rdcExec;
this.nodes = nodes;
this.run = run;
@@ -87,7 +85,7 @@
this.distributedJoins = distributedJoins;
this.mvccTracker = mvccTracker;
- this.idxIter = run.indexes().iterator();
+ idxIter = run.indexes().iterator();
advance();
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndexSorted.java
similarity index 94%
rename from modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java
rename to modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndexSorted.java
index 482752a..4dcfbbf 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndexSorted.java
@@ -54,7 +54,7 @@
/**
* Sorted index.
*/
-public final class GridMergeIndexSorted extends GridMergeIndex {
+public final class ReduceIndexSorted extends ReduceIndex {
/** */
private static final IndexType TYPE = IndexType.createNonUnique(false);
@@ -84,7 +84,7 @@
private final Condition notEmpty = lock.newCondition();
/** */
- private GridResultPage failPage;
+ private ReduceResultPage failPage;
/** */
private MergeStreamIterator it;
@@ -95,9 +95,9 @@
* @param name Index name,
* @param cols Columns.
*/
- public GridMergeIndexSorted(
+ public ReduceIndexSorted(
GridKernalContext ctx,
- GridMergeTable tbl,
+ ReduceTable tbl,
String name,
IndexColumn[] cols
) {
@@ -132,7 +132,7 @@
}
/** {@inheritDoc} */
- @Override protected void addPage0(GridResultPage page) {
+ @Override protected void addPage0(ReduceResultPage page) {
if (page.isFail()) {
lock.lock();
@@ -295,7 +295,7 @@
/**
* Row stream.
*/
- private final class RowStream implements Pollable<GridResultPage> {
+ private final class RowStream implements Pollable<ReduceResultPage> {
/** */
Iterator<Value[]> iter = emptyIterator();
@@ -303,12 +303,12 @@
Row cur;
/** */
- GridResultPage nextPage;
+ ReduceResultPage nextPage;
/**
* @param page Page.
*/
- private void addPage(GridResultPage page) {
+ private void addPage(ReduceResultPage page) {
assert !page.isFail();
if (page.isLast() && page.rowsInPage() == 0)
@@ -330,7 +330,7 @@
}
/** {@inheritDoc} */
- @Override public GridResultPage poll(long timeout, TimeUnit unit) throws InterruptedException {
+ @Override public ReduceResultPage poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
lock.lock();
@@ -340,7 +340,7 @@
if (failPage != null)
return failPage;
- GridResultPage page = nextPage;
+ ReduceResultPage page = nextPage;
if (page != null) {
// isLast && !isDummyLast
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndexUnsorted.java
similarity index 90%
rename from modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
rename to modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndexUnsorted.java
index 4e9d11a..b0f1c5e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndexUnsorted.java
@@ -42,12 +42,12 @@
/**
* Unsorted merge index.
*/
-public final class GridMergeIndexUnsorted extends GridMergeIndex {
+public final class ReduceIndexUnsorted extends ReduceIndex {
/** */
private static final IndexType TYPE = IndexType.createScan(false);
/** */
- private final PollableQueue<GridResultPage> queue = new PollableQueue<>();
+ private final PollableQueue<ReduceResultPage> queue = new PollableQueue<>();
/** */
private final AtomicInteger activeSources = new AtomicInteger(-1);
@@ -60,7 +60,7 @@
* @param tbl Table.
* @param name Index name.
*/
- public GridMergeIndexUnsorted(GridKernalContext ctx, GridMergeTable tbl, String name) {
+ public ReduceIndexUnsorted(GridKernalContext ctx, ReduceTable tbl, String name) {
super(ctx, tbl, name, TYPE, IndexColumn.wrap(tbl.getColumns()));
}
@@ -68,14 +68,14 @@
* @param ctx Context.
* @return Dummy index instance.
*/
- public static GridMergeIndexUnsorted createDummy(GridKernalContext ctx) {
- return new GridMergeIndexUnsorted(ctx);
+ public static ReduceIndexUnsorted createDummy(GridKernalContext ctx) {
+ return new ReduceIndexUnsorted(ctx);
}
/**
* @param ctx Context.
*/
- private GridMergeIndexUnsorted(GridKernalContext ctx) {
+ private ReduceIndexUnsorted(GridKernalContext ctx) {
super(ctx);
}
@@ -100,7 +100,7 @@
}
/** {@inheritDoc} */
- @Override protected void addPage0(GridResultPage page) {
+ @Override protected void addPage0(ReduceResultPage page) {
assert page.rowsInPage() > 0 || page.isLast() || page.isFail();
// Do not add empty page to avoid premature stream termination.
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReducePartitionMapper.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReducePartitionMapper.java
index 608f307..c898a5f 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReducePartitionMapper.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReducePartitionMapper.java
@@ -578,7 +578,7 @@
private Collection<ClusterNode> dataNodes(int grpId, AffinityTopologyVersion topVer) {
Collection<ClusterNode> res = ctx.discovery().cacheGroupAffinityNodes(grpId, topVer);
- return res != null ? res : Collections.<ClusterNode>emptySet();
+ return res != null ? res : Collections.emptySet();
}
/**
@@ -616,7 +616,7 @@
* @param cacheIds Cache IDs.
* @return The first partitioned cache context.
*/
- private GridCacheContext<?,?> findFirstPartitioned(List<Integer> cacheIds) {
+ public GridCacheContext<?,?> findFirstPartitioned(List<Integer> cacheIds) {
for (int i = 0; i < cacheIds.size(); i++) {
GridCacheContext<?, ?> cctx = cacheContext(cacheIds.get(i));
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java
index d2f7e9a..91ab3e5 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java
@@ -34,9 +34,9 @@
/**
* Query run.
*/
-class ReduceQueryRun {
+public class ReduceQueryRun {
/** */
- private final List<GridMergeIndex> idxs;
+ private final List<ReduceIndex> idxs;
/** */
private CountDownLatch latch;
@@ -72,7 +72,9 @@
Boolean dataPageScanEnabled
) {
this.conn = (JdbcConnection)conn;
- this.idxs = new ArrayList<>(idxsCnt);
+
+ idxs = new ArrayList<>(idxsCnt);
+
this.pageSize = pageSize > 0 ? pageSize : GridCacheTwoStepQuery.DFLT_PAGE_SIZE;
this.selectForUpdateFut = selectForUpdateFut;
this.dataPageScanEnabled = dataPageScanEnabled;
@@ -130,7 +132,7 @@
while (latch.getCount() != 0) // We don't need to wait for all nodes to reply.
latch.countDown();
- for (GridMergeIndex idx : idxs) // Fail all merge indexes.
+ for (ReduceIndex idx : idxs) // Fail all merge indexes.
idx.fail(state.nodeId, state.ex);
}
@@ -199,7 +201,7 @@
/**
* @return Indexes.
*/
- List<GridMergeIndex> indexes() {
+ List<ReduceIndex> indexes() {
return idxs;
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceResultPage.java
similarity index 95%
rename from modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
rename to modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceResultPage.java
index 0cb986b..4437dd6 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceResultPage.java
@@ -17,14 +17,6 @@
package org.apache.ignite.internal.processors.query.h2.twostep;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-import java.util.UUID;
-import javax.cache.CacheException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageResponse;
@@ -34,12 +26,21 @@
import org.apache.ignite.plugin.extensions.communication.Message;
import org.h2.value.Value;
+import javax.cache.CacheException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+
import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory.fillArray;
/**
* Page result.
*/
-public class GridResultPage {
+public class ReduceResultPage {
/** */
private final UUID src;
@@ -61,7 +62,7 @@
* @param res Response.
*/
@SuppressWarnings("unchecked")
- public GridResultPage(final GridKernalContext ctx, UUID src, GridQueryNextPageResponse res) {
+ public ReduceResultPage(final GridKernalContext ctx, UUID src, GridQueryNextPageResponse res) {
assert src != null;
this.src = src;
@@ -158,7 +159,7 @@
* @param last Last page for a source.
* @return {@code this}.
*/
- public GridResultPage setLast(boolean last) {
+ public ReduceResultPage setLast(boolean last) {
this.last = last;
return this;
@@ -214,6 +215,6 @@
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(GridResultPage.class, this);
+ return S.toString(ReduceResultPage.class, this);
}
}
\ No newline at end of file
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceScanIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceScanIndex.java
new file mode 100644
index 0000000..f380145
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceScanIndex.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2ScanIndex;
+import org.h2.engine.Session;
+import org.h2.result.SortOrder;
+import org.h2.table.Column;
+import org.h2.table.TableFilter;
+
+import java.util.HashSet;
+
+/**
+ * Scan index wrapper.
+ */
+public class ReduceScanIndex extends GridH2ScanIndex<ReduceIndex> {
+ /**
+ * @param delegate Delegate.
+ */
+ public ReduceScanIndex(ReduceIndex delegate) {
+ super(delegate);
+ }
+
+ /** {@inheritDoc} */
+ @Override public double getCost(Session session, int[] masks, TableFilter[] filters, int filter,
+ SortOrder sortOrder, HashSet<Column> allColumnsSet) {
+ long rows = getRowCountApproximation();
+
+ return getCostRangeIndex(masks, rows, filters, filter, sortOrder, true, allColumnsSet);
+ }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceSourceKey.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceSourceKey.java
new file mode 100644
index 0000000..e673722
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceSourceKey.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import org.apache.ignite.internal.util.typedef.F;
+
+import java.util.UUID;
+
+/**
+ * Reduce source key for a specific remote data source (remote node + specific segment).
+ */
+public class ReduceSourceKey {
+ /** Node ID. */
+ private final UUID nodeId;
+
+ /** Segment. */
+ private final int segment;
+
+ /**
+ * @param nodeId Node ID.
+ * @param segment Segment.
+ */
+ public ReduceSourceKey(UUID nodeId, int segment) {
+ this.nodeId = nodeId;
+ this.segment = segment;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ ReduceSourceKey other = (ReduceSourceKey)o;
+
+ return F.eq(segment, other.segment) && F.eq(nodeId, other.nodeId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return 31 * nodeId.hashCode() + segment;
+ }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceTable.java
similarity index 79%
rename from modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java
rename to modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceTable.java
index 681917f..c520c2a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceTable.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.query.h2.twostep;
import java.util.ArrayList;
-import java.util.HashSet;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2ScanIndex;
import org.apache.ignite.internal.util.typedef.F;
@@ -28,24 +27,24 @@
import org.h2.index.IndexType;
import org.h2.message.DbException;
import org.h2.result.Row;
-import org.h2.result.SortOrder;
-import org.h2.table.Column;
import org.h2.table.IndexColumn;
import org.h2.table.TableBase;
-import org.h2.table.TableFilter;
import org.h2.table.TableType;
/**
- * Merge table for distributed queries.
+ * Table for reduce phase. Created for every splitted map query. Tables are created in H2 through SQL statement.
+ * In order to avoid overhead on SQL execution for every query, we create {@link ReduceTableWrapper} instead
+ * and set real {@code ReduceTable} through thread-local during query execution. This allow us to have only
+ * very limited number of physical tables in H2 for all user requests.
*/
-public class GridMergeTable extends TableBase {
+public class ReduceTable extends TableBase {
/** */
private ArrayList<Index> idxs;
/**
* @param data Data.
*/
- public GridMergeTable(CreateTableData data) {
+ public ReduceTable(CreateTableData data) {
super(data);
}
@@ -61,16 +60,16 @@
/**
* @return Merge index.
*/
- public GridMergeIndex getMergeIndex() {
- return (GridMergeIndex)idxs.get(idxs.size() - 1); // Sorted index must be the last.
+ public ReduceIndex getMergeIndex() {
+ return (ReduceIndex)idxs.get(idxs.size() - 1); // Sorted index must be the last.
}
/**
* @param idx Index.
* @return Scan index.
*/
- public static GridH2ScanIndex<GridMergeIndex> createScanIndex(GridMergeIndex idx) {
- return new ScanIndex(idx);
+ public static GridH2ScanIndex<ReduceIndex> createScanIndex(ReduceIndex idx) {
+ return new ReduceScanIndex(idx);
}
/** {@inheritDoc} */
@@ -178,24 +177,4 @@
@Override public void checkRename() {
throw DbException.getUnsupportedException("rename");
}
-
- /**
- * Scan index wrapper.
- */
- private static class ScanIndex extends GridH2ScanIndex<GridMergeIndex> {
- /**
- * @param delegate Delegate.
- */
- public ScanIndex(GridMergeIndex delegate) {
- super(delegate);
- }
-
- /** {@inheritDoc} */
- @Override public double getCost(Session session, int[] masks, TableFilter[] filters, int filter,
- SortOrder sortOrder, HashSet<Column> allColumnsSet) {
- long rows = getRowCountApproximation();
-
- return getCostRangeIndex(masks, rows, filters, filter, sortOrder, true, allColumnsSet);
- }
- }
}
\ No newline at end of file
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceTableEngine.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceTableEngine.java
new file mode 100644
index 0000000..1ca2e48
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceTableEngine.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import org.h2.api.TableEngine;
+import org.h2.command.ddl.CreateTableData;
+import org.h2.table.Table;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuerySplitter.mergeTableIdentifier;
+
+/**
+ * Engine to create reduce table.
+ */
+public class ReduceTableEngine implements TableEngine {
+ /** */
+ private static final ThreadLocal<ReduceTableWrapper> CREATED_TBL = new ThreadLocal<>();
+
+ /**
+ * Create merge table over the given connection with provided index.
+ *
+ * @param conn Connection.
+ * @param idx Index.
+ * @return Created table.
+ */
+ public static ReduceTableWrapper create(Connection conn, int idx) {
+ try (Statement stmt = conn.createStatement()) {
+ stmt.executeUpdate("CREATE TABLE " + mergeTableIdentifier(idx) +
+ "(fake BOOL) ENGINE \"" + ReduceTableEngine.class.getName() + '"');
+ }
+ catch (SQLException e) {
+ throw new IllegalStateException(e);
+ }
+
+ ReduceTableWrapper tbl = CREATED_TBL.get();
+
+ assert tbl != null;
+
+ CREATED_TBL.remove();
+
+ return tbl;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Table createTable(CreateTableData d) {
+ assert CREATED_TBL.get() == null;
+
+ ReduceTableWrapper tbl = new ReduceTableWrapper(
+ d.schema,
+ d.id,
+ d.tableName,
+ d.persistIndexes,
+ d.persistData
+ );
+
+ CREATED_TBL.set(tbl);
+
+ return tbl;
+ }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridThreadLocalTable.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceTableWrapper.java
similarity index 85%
rename from modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridThreadLocalTable.java
rename to modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceTableWrapper.java
index b01c3d4..ddc0898 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridThreadLocalTable.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceTableWrapper.java
@@ -21,8 +21,6 @@
import java.util.Arrays;
import java.util.HashSet;
import javax.cache.CacheException;
-import org.h2.api.TableEngine;
-import org.h2.command.ddl.CreateTableData;
import org.h2.engine.DbObject;
import org.h2.engine.Session;
import org.h2.index.Index;
@@ -41,11 +39,12 @@
import org.h2.value.Value;
/**
- * Thread local table wrapper for another table instance.
+ * Thread local table wrapper for real reducer table. All reduce queries share the same small set of this fake tables.
+ * During reduce query execution every thread installs it's own real reduce tables into thread-local storage.
*/
-public class GridThreadLocalTable extends Table {
+public class ReduceTableWrapper extends Table {
/** Delegate table */
- private final ThreadLocal<Table> tbl = new ThreadLocal<>();
+ private final ThreadLocal<ReduceTable> tbl = new ThreadLocal<>();
/**
* @param schema Schema.
@@ -54,14 +53,14 @@
* @param persistIndexes Persist indexes.
* @param persistData Persist data.
*/
- public GridThreadLocalTable(Schema schema, int id, String name, boolean persistIndexes, boolean persistData) {
+ public ReduceTableWrapper(Schema schema, int id, String name, boolean persistIndexes, boolean persistData) {
super(schema, id, name, persistIndexes, persistData);
}
/**
* @param t Table or {@code null} to reset existing.
*/
- public void innerTable(Table t) {
+ public void innerTable(ReduceTable t) {
if (t == null)
tbl.remove();
else
@@ -259,37 +258,4 @@
@Override public void checkRename() {
throw DbException.getUnsupportedException("rename");
}
-
- /**
- * Engine.
- */
- public static class Engine implements TableEngine {
- /** */
- private static ThreadLocal<GridThreadLocalTable> createdTbl = new ThreadLocal<>();
-
- /**
- * @return Created table.
- */
- public static GridThreadLocalTable getCreated() {
- GridThreadLocalTable tbl = createdTbl.get();
-
- assert tbl != null;
-
- createdTbl.remove();
-
- return tbl;
- }
-
- /** {@inheritDoc} */
- @Override public Table createTable(CreateTableData d) {
- assert createdTbl.get() == null;
-
- GridThreadLocalTable tbl = new GridThreadLocalTable(d.schema, d.id, d.tableName, d.persistIndexes,
- d.persistData);
-
- createdTbl.set(tbl);
-
- return tbl;
- }
- }
}
\ No newline at end of file
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
index d6937b1..73964a4 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
@@ -47,7 +47,7 @@
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
-import org.apache.ignite.internal.processors.query.h2.twostep.GridMergeIndex;
+import org.apache.ignite.internal.processors.query.h2.twostep.ReduceIndex;
import org.apache.ignite.internal.util.GridRandom;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
@@ -575,7 +575,7 @@
Integer.class, Value.class));
try {
- GridTestUtils.setFieldValue(null, GridMergeIndex.class, "PREFETCH_SIZE", 8);
+ GridTestUtils.setFieldValue(null, ReduceIndex.class, "PREFETCH_SIZE", 8);
Random rnd = new GridRandom();
@@ -625,7 +625,7 @@
}
}
finally {
- GridTestUtils.setFieldValue(null, GridMergeIndex.class, "PREFETCH_SIZE", 1024);
+ GridTestUtils.setFieldValue(null, ReduceIndex.class, "PREFETCH_SIZE", 1024);
c.destroy();
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/NoneOrSinglePartitionsQueryOptimizationsTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/NoneOrSinglePartitionsQueryOptimizationsTest.java
index 7e0aaa5..68c3e09 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/NoneOrSinglePartitionsQueryOptimizationsTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/NoneOrSinglePartitionsQueryOptimizationsTest.java
@@ -467,7 +467,7 @@
if (expMergeTbl)
assertTrue(innerIter instanceof H2ResultSetIterator);
else
- assertTrue(innerIter instanceof GridMergeIndexIterator);
+ assertTrue(innerIter instanceof ReduceIndexIterator);
List<List<?>> all = new ArrayList<>();
diff --git a/modules/jta/pom.xml b/modules/jta/pom.xml
index d55c2e6..742bc61 100644
--- a/modules/jta/pom.xml
+++ b/modules/jta/pom.xml
@@ -50,7 +50,7 @@
<dependency>
<groupId>org.ow2.jotm</groupId>
<artifactId>jotm-core</artifactId>
- <version>2.2.3</version>
+ <version>${jotm.version}</version>
<scope>test</scope>
</dependency>
@@ -95,7 +95,7 @@
<dependency>
<groupId>org.jboss.spec.javax.rmi</groupId>
<artifactId>jboss-rmi-api_1.0_spec</artifactId>
- <version>1.0.6.Final</version>
+ <version>${jboss.rmi.version}</version>
<scope>test</scope>
</dependency>
diff --git a/modules/web-console/e2e/testcafe/components/no-data.js b/modules/web-console/e2e/testcafe/components/no-data.js
new file mode 100644
index 0000000..12be3a6
--- /dev/null
+++ b/modules/web-console/e2e/testcafe/components/no-data.js
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import {Selector} from 'testcafe';
+
+export const noDataCmp = Selector('no-data');
diff --git a/modules/web-console/frontend/app/app.js b/modules/web-console/frontend/app/app.js
index 2c42c62..be4dc16 100644
--- a/modules/web-console/frontend/app/app.js
+++ b/modules/web-console/frontend/app/app.js
@@ -146,6 +146,7 @@
import sidebar from './components/web-console-sidebar';
import permanentNotifications from './components/permanent-notifications';
import signupConfirmation from './components/page-signup-confirmation';
+import noDataCmp from './components/no-data';
import igniteServices from './services';
@@ -247,7 +248,8 @@
sidebar.name,
permanentNotifications.name,
timedRedirection.name,
- signupConfirmation.name
+ signupConfirmation.name,
+ noDataCmp.name
])
.service('$exceptionHandler', $exceptionHandler)
// Directives.
@@ -306,11 +308,11 @@
// Set up the states.
$stateProvider
- .state('base', {
- url: '',
- abstract: true,
- template: baseTemplate
- });
+ .state('base', {
+ url: '',
+ abstract: true,
+ template: baseTemplate
+ });
$urlRouterProvider.otherwise('/404');
$locationProvider.html5Mode(true);
@@ -361,17 +363,17 @@
localStorage.setItem('lastStateChangeSuccess', JSON.stringify({name, params}));
}
catch (ignored) {
- // No-op.
+ // No-op.
}
});
}
])
.run(['$rootScope', '$http', '$state', 'IgniteMessages', 'User', 'IgniteNotebookData',
- /**
- * @param {ng.IRootScopeService} $root
- * @param {ng.IHttpService} $http
- * @param {ReturnType<typeof import('./services/Messages.service').default>} Messages
- */
+ /**
+ * @param {ng.IRootScopeService} $root
+ * @param {ng.IHttpService} $http
+ * @param {ReturnType<typeof import('./services/Messages.service').default>} Messages
+ */
($root, $http, $state, Messages, User, Notebook) => { // eslint-disable-line no-shadow
$root.revertIdentity = () => {
$http.get('/api/v1/admin/revert/identity')
@@ -383,8 +385,8 @@
}
])
.run(['IgniteIcon',
- /**
- * @param {import('./components/ignite-icon/service').default} IgniteIcon
- */
+ /**
+ * @param {import('./components/ignite-icon/service').default} IgniteIcon
+ */
(IgniteIcon) => IgniteIcon.registerIcons(icons)
]);
diff --git a/modules/web-console/frontend/app/components/ignite-chart/component.ts b/modules/web-console/frontend/app/components/ignite-chart/component.ts
new file mode 100644
index 0000000..91a816e
--- /dev/null
+++ b/modules/web-console/frontend/app/components/ignite-chart/component.ts
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { IgniteChartController } from './controller';
+import templateUrl from './template.tpl.pug';
+
+export default {
+ controller: IgniteChartController,
+ templateUrl,
+ bindings: {
+ chartOptions: '<',
+ chartDataPoint: '<',
+ chartHistory: '<',
+ chartTitle: '<',
+ chartColors: '<',
+ chartHeaderText: '<',
+ refreshRate: '<',
+ resultDataStatus: '<?'
+ },
+ transclude: true
+};
diff --git a/modules/web-console/frontend/app/components/ignite-chart/components/chart-no-data/component.ts b/modules/web-console/frontend/app/components/ignite-chart/components/chart-no-data/component.ts
new file mode 100644
index 0000000..9627940
--- /dev/null
+++ b/modules/web-console/frontend/app/components/ignite-chart/components/chart-no-data/component.ts
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import IgniteChartNoDataCtrl from './controller';
+import templateUrl from './template.tpl.pug';
+
+export default {
+ controller: IgniteChartNoDataCtrl,
+ templateUrl,
+ require: {
+ igniteChart: '^igniteChart'
+ },
+ bindings: {
+ resultDataStatus: '<',
+ handleClusterInactive: '<'
+ }
+} as ng.IComponentOptions;
diff --git a/modules/web-console/frontend/app/components/ignite-chart/components/chart-no-data/controller.ts b/modules/web-console/frontend/app/components/ignite-chart/components/chart-no-data/controller.ts
new file mode 100644
index 0000000..6acf8cf
--- /dev/null
+++ b/modules/web-console/frontend/app/components/ignite-chart/components/chart-no-data/controller.ts
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import {merge} from 'rxjs';
+import {tap, pluck, distinctUntilChanged} from 'rxjs/operators';
+
+import {WellKnownOperationStatus} from 'app/types';
+import {IgniteChartController} from '../../controller';
+
+const BLANK_STATUS = new Set([WellKnownOperationStatus.ERROR, WellKnownOperationStatus.WAITING]);
+
+export default class IgniteChartNoDataCtrl implements ng.IOnChanges, ng.IOnDestroy {
+ static $inject = ['AgentManager'];
+
+ constructor(private AgentManager) {}
+
+ igniteChart: IgniteChartController;
+
+ handleClusterInactive: boolean;
+
+ connectionState$ = this.AgentManager.connectionSbj.pipe(
+ pluck('state'),
+ distinctUntilChanged(),
+ tap((state) => {
+ if (state === 'AGENT_DISCONNECTED')
+ this.destroyChart();
+ })
+ );
+
+ cluster$ = this.AgentManager.connectionSbj.pipe(
+ pluck('cluster'),
+ distinctUntilChanged(),
+ tap((cluster) => {
+ if (!cluster && !this.AgentManager.isDemoMode()) {
+ this.destroyChart();
+ return;
+ }
+
+ if (!!cluster && cluster.active === false && this.handleClusterInactive)
+ this.destroyChart();
+
+ })
+ );
+
+ subsribers$ = merge(
+ this.connectionState$,
+ this.cluster$
+ ).subscribe();
+
+ $onChanges(changes) {
+ if (changes.resultDataStatus && BLANK_STATUS.has(changes.resultDataStatus.currentValue))
+ this.destroyChart();
+ }
+
+ $onDestroy() {
+ this.subsribers$.unsubscribe();
+ }
+
+ destroyChart() {
+ if (this.igniteChart && this.igniteChart.chart) {
+ this.igniteChart.chart.destroy();
+ this.igniteChart.config = null;
+ this.igniteChart.chart = null;
+ }
+ }
+}
diff --git a/modules/web-console/frontend/app/components/ignite-chart/components/chart-no-data/index.ts b/modules/web-console/frontend/app/components/ignite-chart/components/chart-no-data/index.ts
new file mode 100644
index 0000000..d36668f
--- /dev/null
+++ b/modules/web-console/frontend/app/components/ignite-chart/components/chart-no-data/index.ts
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import angular from 'angular';
+
+import IgniteChartNoDataCmp from './component';
+
+export default angular.module('ignite-console.ignite-chart.chart-no-data', [])
+ .component('chartNoData', IgniteChartNoDataCmp);
diff --git a/modules/web-console/frontend/app/components/ignite-chart/components/chart-no-data/template.tpl.pug b/modules/web-console/frontend/app/components/ignite-chart/components/chart-no-data/template.tpl.pug
new file mode 100644
index 0000000..bbf2598
--- /dev/null
+++ b/modules/web-console/frontend/app/components/ignite-chart/components/chart-no-data/template.tpl.pug
@@ -0,0 +1,20 @@
+//-
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+no-data(result-data-status='$ctrl.resultDataStatus' handle-cluster-inactive='$ctrl.handleClusterInactive').no-data
+ div(ng-if='!$ctrl.igniteChart.config.data.datasets')
+ | No Data #[br]
+ | Make sure you are connected to the right grid.
diff --git a/modules/web-console/frontend/app/components/ignite-chart/index.js b/modules/web-console/frontend/app/components/ignite-chart/index.js
index 6900814..957a2ae 100644
--- a/modules/web-console/frontend/app/components/ignite-chart/index.js
+++ b/modules/web-console/frontend/app/components/ignite-chart/index.js
@@ -17,22 +17,10 @@
import angular from 'angular';
-import { IgniteChartController } from './controller';
-import template from './template.pug';
+import chartNoData from './components/chart-no-data';
+import IgniteChartCmp from './component';
import './style.scss';
export default angular
- .module('ignite-console.ignite-chart', [])
- .component('igniteChart', {
- controller: IgniteChartController,
- template,
- bindings: {
- chartOptions: '<',
- chartDataPoint: '<',
- chartHistory: '<',
- chartTitle: '<',
- chartColors: '<',
- chartHeaderText: '<',
- refreshRate: '<'
- }
- });
+ .module('ignite-console.ignite-chart', [chartNoData.name])
+ .component('igniteChart', IgniteChartCmp);
diff --git a/modules/web-console/frontend/app/components/ignite-chart/template.pug b/modules/web-console/frontend/app/components/ignite-chart/template.tpl.pug
similarity index 91%
rename from modules/web-console/frontend/app/components/ignite-chart/template.pug
rename to modules/web-console/frontend/app/components/ignite-chart/template.tpl.pug
index 5108853..bfc4c69 100644
--- a/modules/web-console/frontend/app/components/ignite-chart/template.pug
+++ b/modules/web-console/frontend/app/components/ignite-chart/template.tpl.pug
@@ -32,6 +32,4 @@
.ignite-chart-placeholder
canvas
-.no-data(ng-if='!$ctrl.config.data.datasets')
- | No Data #[br]
- | Make sure you are connected to the right grid.
+ng-transclude
\ No newline at end of file
diff --git a/modules/web-console/frontend/app/components/no-data/component.ts b/modules/web-console/frontend/app/components/no-data/component.ts
new file mode 100644
index 0000000..67df1ed
--- /dev/null
+++ b/modules/web-console/frontend/app/components/no-data/component.ts
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import templateUrl from './template.tpl.pug';
+import controller from './controller';
+
+import './style.scss';
+
+export default {
+ controller,
+ templateUrl,
+ transclude: true,
+ bindings: {
+ resultDataStatus: '<',
+ handleClusterInactive: '<'
+ }
+} as ng.IComponentOptions;
diff --git a/modules/web-console/frontend/app/components/no-data/controller.ts b/modules/web-console/frontend/app/components/no-data/controller.ts
new file mode 100644
index 0000000..fa2b540
--- /dev/null
+++ b/modules/web-console/frontend/app/components/no-data/controller.ts
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import _ from 'lodash';
+import {of} from 'rxjs';
+import {distinctUntilChanged, switchMap} from 'rxjs/operators';
+
+export default class NoDataCmpCtrl {
+ static $inject = ['AgentManager', 'AgentModal'];
+
+ connectionState$ = this.AgentManager.connectionSbj.pipe(
+ switchMap((sbj) => {
+ if (!_.isNil(sbj.cluster) && sbj.cluster.active === false)
+ return of('CLUSTER_INACTIVE');
+
+ return of(sbj.state);
+ }),
+ distinctUntilChanged()
+ );
+
+ backText = 'Close';
+
+ constructor(private AgentManager, private AgentModal) {}
+
+ openAgentMissingDialog() {
+ this.AgentModal.agentDisconnected(this.backText, '.');
+ }
+
+ openNodeMissingDialog() {
+ this.AgentModal.clusterDisconnected(this.backText, '.');
+ }
+}
diff --git a/modules/web-console/frontend/app/components/no-data/index.ts b/modules/web-console/frontend/app/components/no-data/index.ts
new file mode 100644
index 0000000..8959ba5
--- /dev/null
+++ b/modules/web-console/frontend/app/components/no-data/index.ts
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import angular from 'angular';
+
+import noDataCmp from './component';
+import './style.scss';
+
+export default angular
+ .module('ignite-console.no-data', [])
+ .component('noData', noDataCmp);
diff --git a/modules/web-console/frontend/app/components/no-data/style.scss b/modules/web-console/frontend/app/components/no-data/style.scss
new file mode 100644
index 0000000..4f16bed
--- /dev/null
+++ b/modules/web-console/frontend/app/components/no-data/style.scss
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+no-data{
+ .data-loading-wrapper {
+ display: flex;
+ align-items: center;
+ justify-content: center;
+
+ .spinner-circle {
+ margin-right: 5px;
+ }
+ }
+}
diff --git a/modules/web-console/frontend/app/components/no-data/template.tpl.pug b/modules/web-console/frontend/app/components/no-data/template.tpl.pug
new file mode 100644
index 0000000..cc3f96c
--- /dev/null
+++ b/modules/web-console/frontend/app/components/no-data/template.tpl.pug
@@ -0,0 +1,35 @@
+//-
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+div(ng-switch='($ctrl.connectionState$ | async:this)')
+ div(ng-switch-when='AGENT_DISCONNECTED')
+ | Agent is disconnected. #[a(ng-click='$ctrl.openAgentMissingDialog()') Check] agent is up and running.
+
+ div(ng-switch-when='CLUSTER_DISCONNECTED')
+ | Cluster is not available. #[a(ng-click='$ctrl.openNodeMissingDialog()') Check] cluster is up and running and agent is appropriately #[a(href="https://apacheignite-tools.readme.io/docs/getting-started#section-configuration" target="_blank") configured].
+
+ div(ng-switch-when='CLUSTER_INACTIVE')
+ div(ng-if='$ctrl.handleClusterInactive') Cluster is inactive. Please activate cluster.
+ div(ng-if='!$ctrl.handleClusterInactive')
+ ng-transclude
+
+ div(ng-switch-default)
+ .data-loading-wrapper(ng-if='$ctrl.resultDataStatus === "WAITING"')
+ .spinner-circle
+ div Data is loading...
+
+ div(ng-if='$ctrl.resultDataStatus !== "WAITING"')
+ ng-transclude
diff --git a/modules/web-console/frontend/app/components/page-queries/components/queries-notebook/components/ignite-information/information.directive.js b/modules/web-console/frontend/app/components/page-queries/components/queries-notebook/components/ignite-information/information.directive.js
index 8e95791..d7f3529 100644
--- a/modules/web-console/frontend/app/components/page-queries/components/queries-notebook/components/ignite-information/information.directive.js
+++ b/modules/web-console/frontend/app/components/page-queries/components/queries-notebook/components/ignite-information/information.directive.js
@@ -15,6 +15,7 @@
* limitations under the License.
*/
+import './information.scss';
import template from './information.pug';
export default function() {
diff --git a/modules/web-console/frontend/app/components/page-queries/components/queries-notebook/controller.ts b/modules/web-console/frontend/app/components/page-queries/components/queries-notebook/controller.ts
index 76aae1c..6d17aca 100644
--- a/modules/web-console/frontend/app/components/page-queries/components/queries-notebook/controller.ts
+++ b/modules/web-console/frontend/app/components/page-queries/components/queries-notebook/controller.ts
@@ -18,8 +18,8 @@
import _ from 'lodash';
import {nonEmpty, nonNil} from 'app/utils/lodashMixins';
import id8 from 'app/utils/id8';
-import {timer, merge, defer, from, of} from 'rxjs';
-import {mergeMap, tap, switchMap, exhaustMap, take, filter, map, catchError} from 'rxjs/operators';
+import {timer, merge, defer, of} from 'rxjs';
+import {tap, switchMap, exhaustMap, take, pluck, distinctUntilChanged, filter, map, catchError} from 'rxjs/operators';
import {CSV} from 'app/services/CSV';
@@ -923,9 +923,6 @@
};
const _startWatch = () => {
- const awaitClusters$ = from(
- agentMgr.startClusterWatch('Leave Queries', 'default-state'));
-
const finishLoading$ = defer(() => {
if (!$root.IgniteDemoMode)
Loading.finish('sqlLoading');
@@ -935,17 +932,32 @@
return merge(timer(0, period).pipe(exhaustMap(() => _refreshCaches())), finishLoading$);
};
- this.refresh$ = awaitClusters$.pipe(
- mergeMap(() => agentMgr.currentCluster$),
- tap(() => Loading.start('sqlLoading')),
- tap(() => {
- _.forEach($scope.notebook.paragraphs, (paragraph) => {
- paragraph.reset($interval);
- });
- }),
- switchMap(() => refreshCaches(5000))
- )
- .subscribe();
+ const cluster$ = agentMgr.connectionSbj.pipe(
+ pluck('cluster'),
+ distinctUntilChanged(),
+ tap((cluster) => {
+ this.clusterIsAvailable = (!!cluster && cluster.active === true) || agentMgr.isDemoMode();
+ })
+ );
+
+ this.refresh$ = cluster$.pipe(
+ switchMap((cluster) => {
+ if (!cluster && !agentMgr.isDemoMode())
+ return of(null);
+
+ return of(cluster).pipe(
+ tap(() => Loading.start('sqlLoading')),
+ tap(() => {
+ _.forEach($scope.notebook.paragraphs, (paragraph) => {
+ paragraph.reset($interval);
+ });
+ }),
+ switchMap(() => refreshCaches(5000))
+ );
+ })
+ );
+
+ this.subscribers$ = merge(this.refresh$).subscribe();
};
const _newParagraph = (paragraph) => {
@@ -1442,7 +1454,8 @@
}
return nids[_.random(0, nids.length - 1)];
- });
+ })
+ .catch(Messages.showError);
};
const _executeRefresh = (paragraph) => {
@@ -1588,6 +1601,8 @@
_showLoading(paragraph, false);
$scope.stopRefresh(paragraph);
+
+ Messages.showError(err);
})
.then(() => paragraph.ace.focus());
});
@@ -2168,7 +2183,7 @@
$onDestroy() {
this._closeOpenedQueries(this.$scope.notebook.paragraphs);
- if (this.refresh$)
- this.refresh$.unsubscribe();
+ if (this.subscribers$)
+ this.subscribers$.unsubscribe();
}
}
diff --git a/modules/web-console/frontend/app/components/page-queries/components/queries-notebook/style.scss b/modules/web-console/frontend/app/components/page-queries/components/queries-notebook/style.scss
index 4978701..e632cdb 100644
--- a/modules/web-console/frontend/app/components/page-queries/components/queries-notebook/style.scss
+++ b/modules/web-console/frontend/app/components/page-queries/components/queries-notebook/style.scss
@@ -15,6 +15,8 @@
* limitations under the License.
*/
+@import "../../../../../public/stylesheets/variables.scss";
+
queries-notebook {
// TODO: Delete this breadcrumbs styles after template refactoring to new design.
.notebooks-top {
@@ -104,6 +106,15 @@
}
}
+ .empty-caches {
+ color: $ignite-placeholder-color;
+ display: flex;
+ padding: 10px;
+ align-items: center;
+ justify-content: center;
+ text-align: center;
+ }
+
.notebook-top-buttons {
display: flex;
align-items: center;
diff --git a/modules/web-console/frontend/app/components/page-queries/components/queries-notebook/template.tpl.pug b/modules/web-console/frontend/app/components/page-queries/components/queries-notebook/template.tpl.pug
index c4033e6..e7334cb 100644
--- a/modules/web-console/frontend/app/components/page-queries/components/queries-notebook/template.tpl.pug
+++ b/modules/web-console/frontend/app/components/page-queries/components/queries-notebook/template.tpl.pug
@@ -403,9 +403,11 @@
tipOpts: { placement: 'top' }
})
.empty-caches(ng-show='displayedCaches.length == 0 && caches.length != 0')
- label Wrong caches filter
+ no-data
+ label Wrong caches filter
.empty-caches(ng-show='caches.length == 0')
- label No caches
+ no-data
+ label No caches
.col-sm-12.sql-controls
div
+query-actions
diff --git a/modules/web-console/frontend/app/configuration/components/modal-import-models/component.js b/modules/web-console/frontend/app/configuration/components/modal-import-models/component.js
index 098bb67..0dbf4f1 100644
--- a/modules/web-console/frontend/app/configuration/components/modal-import-models/component.js
+++ b/modules/web-console/frontend/app/configuration/components/modal-import-models/component.js
@@ -21,8 +21,8 @@
import naturalCompare from 'natural-compare-lite';
import find from 'lodash/fp/find';
import get from 'lodash/fp/get';
-import {race, timer, merge, of, from, combineLatest} from 'rxjs';
-import {tap, filter, take, pluck, switchMap} from 'rxjs/operators';
+import {race, timer, merge, of, from, combineLatest, EMPTY} from 'rxjs';
+import {tap, filter, take, pluck, switchMap, map} from 'rxjs/operators';
import ObjectID from 'bson-objectid';
import {uniqueName} from 'app/utils/uniqueName';
import {defaultNames} from '../../defaultNames';
@@ -94,14 +94,13 @@
/** @type {ng.ICompiledExpression} */
onHide;
- static $inject = ['$uiRouter', 'ConfigSelectors', 'ConfigEffects', 'ConfigureState', '$http', 'IgniteConfirm', 'IgniteConfirmBatch', 'IgniteFocus', 'SqlTypes', 'JavaTypes', 'IgniteMessages', '$scope', '$rootScope', 'AgentManager', 'IgniteActivitiesData', 'IgniteLoading', 'IgniteFormUtils', 'IgniteLegacyUtils'];
+ static $inject = ['$uiRouter', 'ConfigSelectors', 'ConfigEffects', 'ConfigureState', 'IgniteConfirm', 'IgniteConfirmBatch', 'IgniteFocus', 'SqlTypes', 'JavaTypes', 'IgniteMessages', '$scope', '$rootScope', 'AgentManager', 'IgniteActivitiesData', 'IgniteLoading', 'IgniteFormUtils', 'IgniteLegacyUtils'];
/**
* @param {UIRouter} $uiRouter
* @param {ConfigSelectors} ConfigSelectors
* @param {ConfigEffects} ConfigEffects
* @param {ConfigureState} ConfigureState
- * @param {ng.IHttpService} $http
* @param {IgniteConfirmBatch} ConfirmBatch
* @param {SqlTypes} SqlTypes
* @param {JavaTypes} JavaTypes
@@ -110,10 +109,9 @@
* @param {AgentManager} agentMgr
* @param {ActivitiesData} ActivitiesData
*/
- constructor($uiRouter, ConfigSelectors, ConfigEffects, ConfigureState, $http, Confirm, ConfirmBatch, Focus, SqlTypes, JavaTypes, Messages, $scope, $root, agentMgr, ActivitiesData, Loading, FormUtils, LegacyUtils) {
+ constructor($uiRouter, ConfigSelectors, ConfigEffects, ConfigureState, Confirm, ConfirmBatch, Focus, SqlTypes, JavaTypes, Messages, $scope, $root, agentMgr, ActivitiesData, Loading, FormUtils, LegacyUtils) {
this.$uiRouter = $uiRouter;
this.ConfirmBatch = ConfirmBatch;
- this.$http = $http;
this.ConfigSelectors = ConfigSelectors;
this.ConfigEffects = ConfigEffects;
this.ConfigureState = ConfigureState;
@@ -259,14 +257,14 @@
}
$onDestroy() {
- this.subscription.unsubscribe();
+ this.subscribers$.unsubscribe();
if (this.onCacheSelectSubcription) this.onCacheSelectSubcription.unsubscribe();
if (this.saveSubscription) this.saveSubscription.unsubscribe();
}
$onInit() {
// Restores old behavior
- const {$http, Confirm, ConfirmBatch, Focus, SqlTypes, JavaTypes, Messages, $scope, $root, agentMgr, ActivitiesData, Loading, FormUtils, LegacyUtils} = this;
+ const {Confirm, ConfirmBatch, Focus, SqlTypes, JavaTypes, Messages, $scope, $root, agentMgr, ActivitiesData, Loading, FormUtils, LegacyUtils} = this;
/**
* Convert some name to valid java package name.
@@ -317,7 +315,7 @@
}
this.$scope.$watch('importCommon.action', this._fillCommonCachesOrTemplates(this.$scope.importCommon), true);
this.$scope.importCommon.action = IMPORT_DM_NEW_CACHE;
- })).subscribe();
+ }));
// New
this.loadedCaches = {
@@ -1006,63 +1004,84 @@
$scope.importDomain.loadingOptions = LOADING_JDBC_DRIVERS;
- agentMgr.startAgentWatch('Back', this.$uiRouter.globals.current.name)
- .then(() => {
- ActivitiesData.post({
- group: 'configuration',
- action: 'configuration/import/model'
- });
-
- return true;
- })
- .then(() => {
- if (demo) {
- $scope.ui.packageNameUserInput = $scope.ui.packageName;
- $scope.ui.packageName = 'model';
-
- return;
- }
-
- // Get available JDBC drivers via agent.
- Loading.start('importDomainFromDb');
-
- $scope.jdbcDriverJars = [];
- $scope.ui.selectedJdbcDriverJar = {};
-
- return agentMgr.drivers()
- .then((drivers) => {
- $scope.ui.packageName = $scope.ui.packageNameUserInput;
-
- if (drivers && drivers.length > 0) {
- drivers = _.sortBy(drivers, 'jdbcDriverJar');
-
- _.forEach(drivers, (drv) => {
- $scope.jdbcDriverJars.push({
- label: drv.jdbcDriverJar,
- value: {
- jdbcDriverJar: drv.jdbcDriverJar,
- jdbcDriverClass: drv.jdbcDriverCls
- }
- });
- });
-
- $scope.ui.selectedJdbcDriverJar = $scope.jdbcDriverJars[0].value;
-
- $scope.importDomain.action = 'connect';
- $scope.importDomain.tables = [];
- this.selectedTables = [];
- }
- else {
- $scope.importDomain.jdbcDriversNotFound = true;
- $scope.importDomain.button = 'Cancel';
- }
- })
- .then(() => {
- $scope.importDomain.info = INFO_CONNECT_TO_DB;
-
- Loading.finish('importDomainFromDb');
+ const fetchDomainData = () => {
+ return agentMgr.awaitAgent()
+ .then(() => {
+ ActivitiesData.post({
+ group: 'configuration',
+ action: 'configuration/import/model'
});
- });
+
+ return true;
+ })
+ .then(() => {
+ if (demo) {
+ $scope.ui.packageNameUserInput = $scope.ui.packageName;
+ $scope.ui.packageName = 'model';
+
+ return;
+ }
+
+ // Get available JDBC drivers via agent.
+ Loading.start('importDomainFromDb');
+
+ $scope.jdbcDriverJars = [];
+ $scope.ui.selectedJdbcDriverJar = {};
+
+ return agentMgr.drivers()
+ .then((drivers) => {
+ $scope.ui.packageName = $scope.ui.packageNameUserInput;
+
+ if (drivers && drivers.length > 0) {
+ drivers = _.sortBy(drivers, 'jdbcDriverJar');
+
+ _.forEach(drivers, (drv) => {
+ $scope.jdbcDriverJars.push({
+ label: drv.jdbcDriverJar,
+ value: {
+ jdbcDriverJar: drv.jdbcDriverJar,
+ jdbcDriverClass: drv.jdbcDriverCls
+ }
+ });
+ });
+
+ $scope.ui.selectedJdbcDriverJar = $scope.jdbcDriverJars[0].value;
+
+ $scope.importDomain.action = 'connect';
+ $scope.importDomain.tables = [];
+ this.selectedTables = [];
+ }
+ else {
+ $scope.importDomain.jdbcDriversNotFound = true;
+ $scope.importDomain.button = 'Cancel';
+ }
+ })
+ .then(() => {
+ $scope.importDomain.info = INFO_CONNECT_TO_DB;
+
+ Loading.finish('importDomainFromDb');
+ });
+ });
+ };
+
+ this.agentIsAvailable$ = this.agentMgr.connectionSbj.pipe(
+ pluck('state'),
+ map((state) => state !== 'AGENT_DISCONNECTED')
+ );
+
+ this.domainData$ = this.agentIsAvailable$.pipe(
+ switchMap((agentIsAvailable) => {
+ if (!agentIsAvailable)
+ return of(EMPTY);
+
+ return from(fetchDomainData());
+ })
+ );
+
+ this.subscribers$ = merge(
+ this.subscription,
+ this.domainData$
+ ).subscribe();
$scope.$watch('ui.selectedJdbcDriverJar', function(val) {
if (val && !$scope.importDomain.demo) {
diff --git a/modules/web-console/frontend/app/configuration/components/modal-import-models/service.ts b/modules/web-console/frontend/app/configuration/components/modal-import-models/service.ts
index 891b905..c440fe8 100644
--- a/modules/web-console/frontend/app/configuration/components/modal-import-models/service.ts
+++ b/modules/web-console/frontend/app/configuration/components/modal-import-models/service.ts
@@ -84,8 +84,7 @@
show: false
});
- return this.AgentManager.startAgentWatch('Back', this.$uiRouter.globals.current.name)
- .then(() => this._modal.$promise)
+ return this._modal.$promise
.then(() => this._modal.show())
.then(() => this.deferred.promise)
.finally(() => this.deferred = null);
diff --git a/modules/web-console/frontend/app/configuration/components/modal-import-models/style.scss b/modules/web-console/frontend/app/configuration/components/modal-import-models/style.scss
index 5c109b4..b0a5877 100644
--- a/modules/web-console/frontend/app/configuration/components/modal-import-models/style.scss
+++ b/modules/web-console/frontend/app/configuration/components/modal-import-models/style.scss
@@ -50,4 +50,12 @@
.#{&}__next-button {
margin-left: auto !important;
}
+
+ no-data {
+ margin: 30px;
+ display: flex;
+ align-items: center;
+ justify-content: center;
+ height: 300px;
+ }
}
\ No newline at end of file
diff --git a/modules/web-console/frontend/app/configuration/components/modal-import-models/template.tpl.pug b/modules/web-console/frontend/app/configuration/components/modal-import-models/template.tpl.pug
index ff3bafe..9b4c597 100644
--- a/modules/web-console/frontend/app/configuration/components/modal-import-models/template.tpl.pug
+++ b/modules/web-console/frontend/app/configuration/components/modal-import-models/template.tpl.pug
@@ -33,7 +33,7 @@
h4.modal-title()
span(ng-if='!importDomain.demo') Import domain models from database
span(ng-if='importDomain.demo') Import domain models from demo database
- .modal-body
+ .modal-body(ng-if-start='$ctrl.agentIsAvailable$ | async: this')
modal-import-models-step-indicator(
steps='$ctrl.actions'
current-step='importDomain.action'
@@ -226,7 +226,7 @@
tipOpts
})
- .modal-footer
+ .modal-footer(ng-if-end)
button.btn-ignite.btn-ignite--success.modal-import-models__prev-button(
type='button'
ng-hide='importDomain.action == "drivers" || importDomain.action == "connect"'
@@ -255,3 +255,9 @@
)
svg.icon-left(ignite-icon='checkmark' ng-show='importDomain.button === "Save"')
| {{importDomain.button}}
+
+ .modal-body(ng-if-start='!($ctrl.agentIsAvailable$ | async: this)')
+ no-data
+
+ .modal-footer(ng-if-end)
+ button.btn-ignite.btn-ignite--success.modal-import-models__next-button(ng-click='$ctrl.onHide()') Cancel
diff --git a/modules/web-console/frontend/app/modules/agent/AgentManager.service.js b/modules/web-console/frontend/app/modules/agent/AgentManager.service.js
index 8e4dd2d..23953cb 100644
--- a/modules/web-console/frontend/app/modules/agent/AgentManager.service.js
+++ b/modules/web-console/frontend/app/modules/agent/AgentManager.service.js
@@ -193,6 +193,11 @@
pluck('active')
);
+ this.clusterIsAvailable$ = this.connectionSbj.pipe(
+ pluck('cluster'),
+ map((cluster) => !!cluster)
+ );
+
if (!this.isDemoMode()) {
this.connectionSbj.subscribe({
next: ({cluster}) => {
@@ -354,58 +359,6 @@
return this.awaitAgent();
}
- /**
- * @param {String} backText
- * @param {String} [backState]
- * @returns {ng.IPromise}
- */
- startClusterWatch(backText, backState) {
- this.backText = backText;
- this.backState = backState;
-
- const conn = this.connectionSbj.getValue();
-
- conn.useConnectedCluster();
-
- this.connectionSbj.next(conn);
-
- this.modalSubscription && this.modalSubscription.unsubscribe();
-
- this.modalSubscription = this.connectionSbj.subscribe({
- next: ({state}) => {
- switch (state) {
- case State.CONNECTED:
- this.agentModal.hide();
-
- break;
-
- case State.AGENT_DISCONNECTED:
- this.agentModal.agentDisconnected(this.backText, this.backState);
- this.ClusterLoginSrv.cancel();
-
- break;
-
- case State.CLUSTER_DISCONNECTED:
- this.agentModal.clusterDisconnected(this.backText, this.backState);
- this.ClusterLoginSrv.cancel();
-
- break;
-
- default:
- // Connection to backend is not established yet.
- }
- }
- });
-
- const stopWatchUnsubscribe = this.$transitions.onExit({}, () => {
- this.stopWatch();
-
- stopWatchUnsubscribe();
- });
-
- return this.awaitCluster();
- }
-
stopWatch() {
this.modalSubscription && this.modalSubscription.unsubscribe();
diff --git a/modules/web-console/frontend/app/types/index.ts b/modules/web-console/frontend/app/types/index.ts
index 89f495e..905558a 100644
--- a/modules/web-console/frontend/app/types/index.ts
+++ b/modules/web-console/frontend/app/types/index.ts
@@ -74,3 +74,9 @@
notifyAboutError(): void
hideError(): void
}
+
+export enum WellKnownOperationStatus {
+ WAITING = 'WAITING',
+ ERROR = 'ERROR',
+ DONE = 'DONE'
+}
diff --git a/modules/web-console/frontend/public/stylesheets/style.scss b/modules/web-console/frontend/public/stylesheets/style.scss
index 52f6250..15cdb65 100644
--- a/modules/web-console/frontend/public/stylesheets/style.scss
+++ b/modules/web-console/frontend/public/stylesheets/style.scss
@@ -536,13 +536,6 @@
line-height: 65px;
}
- .empty-caches {
- text-align: center;
- color: $ignite-placeholder-color;
- height: 55px;
- line-height: 55px;
- }
-
.panel-collapse {
border-top: 1px solid $ignite-border-color;
}
diff --git a/parent/pom.xml b/parent/pom.xml
index d8b6b75..3b45179 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -77,16 +77,20 @@
<httpcore.version>4.4.3</httpcore.version>
<jackson.version>2.9.6</jackson.version>
<jackson1.version>1.9.13</jackson1.version>
+ <jaxb.api.version>2.1</jaxb.api.version>
+ <jaxb.impl.version>2.1.14</jaxb.impl.version>
<javassist.version>3.20.0-GA</javassist.version>
<javax.cache.bundle.version>1.0.0_1</javax.cache.bundle.version>
<javax.cache.tck.version>1.0.1</javax.cache.tck.version>
<javax.cache.version>1.0.0</javax.cache.version>
+ <jboss.rmi.version>1.0.6.Final</jboss.rmi.version>
<jetbrains.annotations.version>16.0.3</jetbrains.annotations.version>
<jetty.version>9.4.11.v20180605</jetty.version>
<jmh.version>1.13</jmh.version>
<jms.spec.version>1.1.1</jms.spec.version>
<jna.version>4.5.2</jna.version>
<jnr.posix.version>3.0.46</jnr.posix.version>
+ <jotm.version>2.2.3</jotm.version>
<jsch.bundle.version>0.1.54_1</jsch.bundle.version>
<jsch.version>0.1.54</jsch.version>
<jsonlib.bundle.version>2.4_1</jsonlib.bundle.version>