blob: 1ee56d521766b45ba53c241765a7aa99ee0cb3dd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.systemview;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static org.apache.ignite.internal.systemview.SystemViewManagerImpl.NODE_ATTRIBUTES_KEY;
import static org.apache.ignite.internal.systemview.SystemViewManagerImpl.NODE_ATTRIBUTES_LIST_SEPARATOR;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.CollectionUtils.first;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.SubscriptionUtils.fromIterable;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.aMapWithSize;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.function.Function;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.CatalogValidationException;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
import org.apache.ignite.internal.lang.InternalTuple;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.network.ClusterNodeImpl;
import org.apache.ignite.internal.schema.SchemaTestUtils;
import org.apache.ignite.internal.systemview.api.SystemView;
import org.apache.ignite.internal.systemview.api.SystemViews;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.type.NativeType;
import org.apache.ignite.internal.type.NativeTypeSpec;
import org.apache.ignite.internal.type.NativeTypes;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
/**
* Test class for {@link SystemViewManagerImpl}.
*/
@ExtendWith(MockitoExtension.class)
public class SystemViewManagerTest extends BaseIgniteAbstractTest {
private static final String LOCAL_NODE_NAME = "LOCAL_NODE_NAME";
private final CatalogManager catalog = Mockito.mock(CatalogManager.class);
private SystemViewManagerImpl viewMgr;
@BeforeEach
void setUp() {
viewMgr = new SystemViewManagerImpl(LOCAL_NODE_NAME, catalog);
}
@Test
public void registerDuplicateNameFails() {
String name = "testView1";
SystemView<?> view = dummyView(name);
SystemView<?> viewWithSameName = dummyView(name);
viewMgr.register(() -> List.of(view));
assertThrows(IllegalArgumentException.class, () -> viewMgr.register(() -> List.of(viewWithSameName)));
verifyNoInteractions(catalog);
}
@Test
public void registerAfterStartFails() {
assertThat(viewMgr.startAsync(), willCompleteSuccessfully());
assertThrows(IllegalStateException.class, () -> viewMgr.register(() -> List.of(dummyView("test"))));
verifyNoInteractions(catalog);
}
@Test
public void startAfterStartFails() {
when(catalog.catalogInitializationFuture()).thenReturn(nullCompletedFuture());
when(catalog.catalogReadyFuture(1)).thenReturn(nullCompletedFuture());
when(catalog.execute(anyList())).thenReturn(nullCompletedFuture());
viewMgr.register(() -> List.of(dummyView("test")));
assertThat(viewMgr.startAsync(), willCompleteSuccessfully());
verify(catalog, times(1)).execute(anyList());
reset(catalog);
assertThrows(IllegalStateException.class, viewMgr::startAsync);
assertThrows(IllegalStateException.class, viewMgr::startAsync);
verifyNoMoreInteractions(catalog);
}
@Test
public void registrationCompletesWithoutViews() {
assertThat(viewMgr.startAsync(), willCompleteSuccessfully());
verifyNoMoreInteractions(catalog);
assertTrue(viewMgr.completeRegistration().isDone());
}
@ParameterizedTest
@EnumSource(NativeTypeSpec.class)
public void registerAllColumnTypes(NativeTypeSpec typeSpec) {
NativeType type = SchemaTestUtils.specToType(typeSpec);
when(catalog.catalogInitializationFuture()).thenReturn(nullCompletedFuture());
when(catalog.catalogReadyFuture(1)).thenReturn(nullCompletedFuture());
when(catalog.execute(anyList())).thenReturn(nullCompletedFuture());
viewMgr.register(() -> List.of(dummyView("test", type)));
assertThat(viewMgr.startAsync(), willCompleteSuccessfully());
verify(catalog, times(1)).execute(anyList());
assertTrue(viewMgr.completeRegistration().isDone());
}
@Test
public void managerStartsSuccessfullyEvenIfCatalogRespondsWithError() {
CatalogValidationException expected = new CatalogValidationException("Expected exception.");
when(catalog.catalogInitializationFuture()).thenReturn(nullCompletedFuture());
when(catalog.catalogReadyFuture(1)).thenReturn(nullCompletedFuture());
when(catalog.execute(anyList())).thenReturn(failedFuture(expected));
viewMgr.register(() -> List.of(dummyView("test")));
assertThat(viewMgr.startAsync(), willCompleteSuccessfully());
verify(catalog, times(1)).execute(anyList());
assertThat(viewMgr.completeRegistration(), willBe(nullValue()));
}
@Test
public void nodeAttributesUpdatedAfterStart() {
when(catalog.catalogInitializationFuture()).thenReturn(nullCompletedFuture());
when(catalog.catalogReadyFuture(1)).thenReturn(nullCompletedFuture());
when(catalog.execute(anyList())).thenReturn(nullCompletedFuture());
String name1 = "view1";
String name2 = "view2";
viewMgr.register(() -> List.of(dummyView(name1), dummyView(name2)));
assertThat(viewMgr.nodeAttributes(), aMapWithSize(0));
assertThat(viewMgr.startAsync(), willCompleteSuccessfully());
assertThat(viewMgr.nodeAttributes(), is(Map.of(NODE_ATTRIBUTES_KEY, String.join(NODE_ATTRIBUTES_LIST_SEPARATOR, name1.toUpperCase(
Locale.ROOT), name2.toUpperCase(Locale.ROOT)))));
}
@Test
public void registrationFutureCompletesWhenComponentStops() {
assertThat(viewMgr.stopAsync(), willCompleteSuccessfully());
assertThat(viewMgr.completeRegistration(), willThrowFast(NodeStoppingException.class));
}
@Test
public void startAfterStopFails() {
assertThat(viewMgr.stopAsync(), willCompleteSuccessfully());
//noinspection ThrowableNotThrown
assertThrowsWithCause(viewMgr::startAsync, NodeStoppingException.class);
}
@Test
public void registerAfterStopFails() {
assertThat(viewMgr.stopAsync(), willCompleteSuccessfully());
//noinspection ThrowableNotThrown
assertThrowsWithCause(() -> viewMgr.register(() -> List.of(dummyView("test"))), NodeStoppingException.class);
}
@Test
public void stopAfterStopDoesNothing() {
assertThat(viewMgr.stopAsync(), willCompleteSuccessfully());
assertThat(viewMgr.stopAsync(), willCompleteSuccessfully());
}
@Test
@SuppressWarnings("DataFlowIssue")
void managerDerivesViewPlacementFromLogicalTopologyEvents() {
String viewName = "MY_VIEW";
assertThat(viewMgr.owningNodes(viewName), empty());
List<String> allNodes = List.of("A", "B", "C");
LogicalTopologySnapshot topologySnapshot = topologySnapshot(viewName, allNodes, 0, 2);
viewMgr.onNodeJoined(first(topologySnapshot.nodes()), topologySnapshot);
assertThat(viewMgr.owningNodes(viewName), hasItem("A"));
assertThat(viewMgr.owningNodes(viewName), hasItem("C"));
topologySnapshot = topologySnapshot(viewName, allNodes, 0, 1);
viewMgr.onNodeLeft(first(topologySnapshot.nodes()), topologySnapshot);
assertThat(viewMgr.owningNodes(viewName), hasItem("A"));
assertThat(viewMgr.owningNodes(viewName), hasItem("B"));
topologySnapshot = topologySnapshot(viewName, allNodes, 1, 2);
viewMgr.onTopologyLeap(topologySnapshot);
assertThat(viewMgr.owningNodes(viewName), hasItem("B"));
assertThat(viewMgr.owningNodes(viewName), hasItem("C"));
}
@Test
void viewScanTest() {
when(catalog.catalogInitializationFuture()).thenReturn(nullCompletedFuture());
when(catalog.catalogReadyFuture(1)).thenReturn(nullCompletedFuture());
when(catalog.execute(anyList())).thenReturn(nullCompletedFuture());
String nodeView = "NODE_VIEW";
String clusterView = "CLUSTER_VIEW";
class Pojo {
private final int c1;
private final int c2;
private Pojo(int c1, int c2) {
this.c1 = c1;
this.c2 = c2;
}
}
Publisher<Pojo> dataSet = fromIterable(List.of(new Pojo(1, 1), new Pojo(2, 2)));
viewMgr.register(() -> List.of(
SystemViews.<Pojo>nodeViewBuilder()
.name(nodeView)
.nodeNameColumnAlias("NODE")
.addColumn("C1", NativeTypes.INT32, p -> p.c1)
.addColumn("C2", NativeTypes.INT32, p -> p.c2)
.dataProvider(dataSet)
.build(),
SystemViews.<Pojo>clusterViewBuilder()
.name(clusterView)
.addColumn("C1", NativeTypes.INT32, p -> p.c1)
.addColumn("C2", NativeTypes.INT32, p -> p.c2)
.dataProvider(dataSet)
.build()
));
assertThat(viewMgr.startAsync(), willCompleteSuccessfully());
{
DrainAllSubscriber<InternalTuple> subs = new DrainAllSubscriber<>();
viewMgr.scanView(clusterView).subscribe(subs);
List<InternalTuple> entries = await(subs.completion);
assertThat(entries, hasSize(2));
assertThat(entries.get(0).intValue(0), equalTo(1));
assertThat(entries.get(0).intValue(1), equalTo(1));
assertThat(entries.get(1).intValue(0), equalTo(2));
assertThat(entries.get(1).intValue(1), equalTo(2));
}
{
DrainAllSubscriber<InternalTuple> subs = new DrainAllSubscriber<>();
viewMgr.scanView(nodeView).subscribe(subs);
List<InternalTuple> entries = await(subs.completion);
assertThat(entries, hasSize(2));
assertThat(entries.get(0).stringValue(0), equalTo(LOCAL_NODE_NAME));
assertThat(entries.get(0).intValue(1), equalTo(1));
assertThat(entries.get(0).intValue(2), equalTo(1));
assertThat(entries.get(1).stringValue(0), equalTo(LOCAL_NODE_NAME));
assertThat(entries.get(1).intValue(1), equalTo(2));
assertThat(entries.get(1).intValue(2), equalTo(2));
}
}
private static SystemView<?> dummyView(String name) {
return dummyView(name, NativeTypes.INT32);
}
private static <T> SystemView<T> dummyView(String name, NativeType type) {
return (SystemView<T>) SystemViews.nodeViewBuilder()
.nodeNameColumnAlias("NODE")
.name(name)
.addColumn("c1", type, (Function<Object, T>) Function.identity())
.dataProvider(fromIterable(List.of()))
.build();
}
private static LogicalTopologySnapshot topologySnapshot(String viewName, List<String> allNodes, int... owningNodes) {
BitSet owningNodesSet = new BitSet();
for (int idx : owningNodes) {
owningNodesSet.set(idx);
}
List<LogicalNode> topology = new ArrayList<>(allNodes.size());
for (int i = 0; i < allNodes.size(); i++) {
String name = allNodes.get(i);
ClusterNode clusterNode = new ClusterNodeImpl(name, name, new NetworkAddress("127.0.0.1", 1010 + i));
Map<String, String> systemAttributes;
if (owningNodesSet.get(i)) {
systemAttributes = Map.of(NODE_ATTRIBUTES_KEY, viewName);
} else {
systemAttributes = Map.of();
}
topology.add(new LogicalNode(clusterNode, Map.of(), systemAttributes, List.of()));
}
return new LogicalTopologySnapshot(1, topology);
}
static class DrainAllSubscriber<T> implements Subscriber<T> {
private final List<T> entries = new ArrayList<>();
CompletableFuture<List<T>> completion = new CompletableFuture<>();
@Override
public void onSubscribe(Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(T item) {
entries.add(item);
}
@Override
public void onError(Throwable throwable) {
completion.completeExceptionally(throwable);
}
@Override
public void onComplete() {
completion.complete(entries);
}
}
}