blob: 80838e5942b8777beafee36ec525c6dceb661f1b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.metastorage.impl;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.clusterService;
import static org.apache.ignite.internal.testframework.flow.TestFlowUtils.subscribeToList;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.will;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
import static org.apache.ignite.internal.util.IgniteUtils.startAsync;
import static org.apache.ignite.internal.util.IgniteUtils.stopAsync;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.failure.NoOpFailureProcessor;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.RevisionUpdateListener;
import org.apache.ignite.internal.metastorage.WatchEvent;
import org.apache.ignite.internal.metastorage.WatchListener;
import org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration;
import org.apache.ignite.internal.metastorage.dsl.Conditions;
import org.apache.ignite.internal.metastorage.dsl.Operations;
import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
import org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
import org.apache.ignite.internal.metastorage.server.time.ClusterTime;
import org.apache.ignite.internal.metrics.NoOpMetricManager;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.StaticNodeFinder;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
/**
* Integration tests for {@link MetaStorageManagerImpl}.
*/
@ExtendWith(ConfigurationExtension.class)
public class ItMetaStorageManagerImplTest extends IgniteAbstractTest {
private ClusterService clusterService;
private Loza raftManager;
private KeyValueStorage storage;
private MetaStorageManagerImpl metaStorageManager;
@BeforeEach
void setUp(
TestInfo testInfo,
@InjectConfiguration RaftConfiguration raftConfiguration,
@InjectConfiguration("mock.idleSyncTimeInterval = 100") MetaStorageConfiguration metaStorageConfiguration
) {
var addr = new NetworkAddress("localhost", 10_000);
clusterService = clusterService(testInfo, addr.port(), new StaticNodeFinder(List.of(addr)));
HybridClock clock = new HybridClockImpl();
var raftGroupEventsClientListener = new RaftGroupEventsClientListener();
raftManager = new Loza(clusterService, new NoOpMetricManager(), raftConfiguration, workDir.resolve("loza"), clock,
raftGroupEventsClientListener);
var logicalTopologyService = mock(LogicalTopologyService.class);
var topologyAwareRaftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory(
clusterService,
logicalTopologyService,
Loza.FACTORY,
raftGroupEventsClientListener
);
ClusterManagementGroupManager cmgManager = mock(ClusterManagementGroupManager.class);
when(cmgManager.metaStorageNodes()).thenReturn(completedFuture(Set.of(clusterService.nodeName())));
storage = new RocksDbKeyValueStorage(
clusterService.nodeName(),
workDir.resolve("metastorage"),
new NoOpFailureProcessor(clusterService.nodeName()));
metaStorageManager = new MetaStorageManagerImpl(
clusterService,
cmgManager,
logicalTopologyService,
raftManager,
storage,
clock,
topologyAwareRaftGroupServiceFactory,
metaStorageConfiguration
);
assertThat(
startAsync(clusterService, raftManager, metaStorageManager)
.thenCompose(unused -> metaStorageManager.deployWatches()),
willCompleteSuccessfully()
);
}
@AfterEach
void tearDown() throws Exception {
List<IgniteComponent> components = List.of(metaStorageManager, raftManager, clusterService);
closeAll(Stream.concat(
components.stream().map(c -> c::beforeNodeStop),
Stream.of(() -> assertThat(stopAsync(components), willCompleteSuccessfully()))
));
}
/**
* Tests a corner case when a prefix request contains a max unsigned byte value.
*/
@Test
void testPrefixOverflow() {
byte[] value = "value".getBytes(UTF_8);
var key1 = new ByteArray(new byte[]{1, (byte) 0xFF, 0});
var key2 = new ByteArray(new byte[]{1, (byte) 0xFF, 1});
var key3 = new ByteArray(new byte[]{1, (byte) 0xFF, (byte) 0xFF});
// Contains the next lexicographical prefix
var key4 = new ByteArray(new byte[]{1, 0, 1});
// Contains the previous lexicographical prefix
var key5 = new ByteArray(new byte[]{1, (byte) 0xFE, 1});
CompletableFuture<Boolean> invokeFuture = metaStorageManager.invoke(
Conditions.notExists(new ByteArray("foo")),
List.of(
Operations.put(key1, value),
Operations.put(key2, value),
Operations.put(key3, value),
Operations.put(key4, value),
Operations.put(key5, value)
),
List.of(Operations.noop())
);
assertThat(invokeFuture, willBe(true));
CompletableFuture<List<byte[]>> actualKeysFuture =
subscribeToList(metaStorageManager.prefix(new ByteArray(new byte[]{1, (byte) 0xFF})))
.thenApply(entries -> entries.stream().map(Entry::key).collect(toList()));
assertThat(actualKeysFuture, will(contains(key1.bytes(), key2.bytes(), key3.bytes())));
}
@Test
void testMetaStorageStopClosesRaftService() throws Exception {
MetaStorageServiceImpl svc = metaStorageManager.metaStorageService().join();
assertThat(metaStorageManager.stopAsync(), willCompleteSuccessfully());
CompletableFuture<Entry> fut = svc.get(ByteArray.fromString("ignored"));
assertThat(fut, willThrowFast(CancellationException.class));
}
@Test
void testMetaStorageStopBeforeRaftServiceStarted() throws Exception {
assertThat(metaStorageManager.stopAsync(), willCompleteSuccessfully()); // Close MetaStorage that is created in setUp.
ClusterManagementGroupManager cmgManager = mock(ClusterManagementGroupManager.class);
Set<String> msNodes = Set.of(clusterService.nodeName());
CompletableFuture<Set<String>> cmgFut = new CompletableFuture<>();
when(cmgManager.metaStorageNodes()).thenReturn(cmgFut);
metaStorageManager = new MetaStorageManagerImpl(
clusterService,
cmgManager,
mock(LogicalTopologyService.class),
raftManager,
storage,
new HybridClockImpl(),
mock(TopologyAwareRaftGroupServiceFactory.class)
);
assertThat(metaStorageManager.stopAsync(), willCompleteSuccessfully());
// Unblock the future so raft service can be initialized. Although the future should be cancelled already by the
// stop method.
cmgFut.complete(msNodes);
assertThat(metaStorageManager.metaStorageService(), willThrowFast(CancellationException.class));
}
@Test
void testUpdateRevisionListener() {
ArgumentCaptor<Long> revisionCapture = ArgumentCaptor.forClass(Long.class);
RevisionUpdateListener listener = mock(RevisionUpdateListener.class);
when(listener.onUpdated(revisionCapture.capture())).thenReturn(nullCompletedFuture());
long revision = metaStorageManager.appliedRevision();
metaStorageManager.registerRevisionUpdateListener(listener);
assertThat(metaStorageManager.put(ByteArray.fromString("test"), "test".getBytes(UTF_8)), willSucceedFast());
// Watches are processed asynchronously.
// Timeout is big just in case there's a GC pause. Test's duration doesn't really depend on it.
verify(listener, timeout(5000).atLeast(1)).onUpdated(anyLong());
assertThat(revisionCapture.getAllValues(), is(List.of(revision + 1)));
}
/**
* Tests that idle safe time propagation does not advance safe time while watches of a normal command are being executed.
*/
@Test
void testIdleSafeTimePropagationAndNormalSafeTimePropagationInteraction(TestInfo testInfo) throws Exception {
var key = new ByteArray("foo");
byte[] value = "bar".getBytes(UTF_8);
AtomicBoolean watchCompleted = new AtomicBoolean(false);
CompletableFuture<HybridTimestamp> watchEventTsFuture = new CompletableFuture<>();
metaStorageManager.registerExactWatch(key, new WatchListener() {
@Override
public CompletableFuture<Void> onUpdate(WatchEvent event) {
watchEventTsFuture.complete(event.timestamp());
// The future will set the flag and complete after 300ms to allow idle safe time mechanism (which ticks each 100ms)
// to advance SafeTime (if there is still a bug for which this test is written).
return waitFor(300, TimeUnit.MILLISECONDS)
.whenComplete((res, ex) -> watchCompleted.set(true));
}
@Override
public void onError(Throwable e) {
}
});
metaStorageManager.put(key, value);
ClusterTime clusterTime = metaStorageManager.clusterTime();
assertThat(watchEventTsFuture, willSucceedIn(5, TimeUnit.SECONDS));
HybridTimestamp watchEventTs = watchEventTsFuture.join();
assertThat(clusterTime.waitFor(watchEventTs), willCompleteSuccessfully());
assertThat("Safe time is advanced too early", watchCompleted.get(), is(true));
}
private static CompletableFuture<Void> waitFor(int timeout, TimeUnit unit) {
return new CompletableFuture<Void>()
.orTimeout(timeout, unit)
.exceptionally(ex -> null);
}
}