blob: 184f3c6223e4326f0f6ba56c2baa303cd0690a75 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.distributionzones.rebalance;
import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.internal.affinity.AffinityUtils.calculateAssignmentForPartition;
import static org.apache.ignite.internal.affinity.Assignments.toBytes;
import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.io.Serializable;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
import org.apache.ignite.internal.affinity.Assignment;
import org.apache.ignite.internal.affinity.Assignments;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.command.MetaStorageCommandsFactory;
import org.apache.ignite.internal.metastorage.command.MetaStorageWriteCommand;
import org.apache.ignite.internal.metastorage.command.MultiInvokeCommand;
import org.apache.ignite.internal.metastorage.dsl.Iif;
import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.WriteCommand;
import org.apache.ignite.internal.raft.service.CommandClosure;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.sql.ColumnType;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
/**
* Tests for updating assignment in the meta storage.
*/
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
public class RebalanceUtilUpdateAssignmentsTest extends IgniteAbstractTest {
private static final IgniteLogger LOG = Loggers.forClass(RebalanceUtilUpdateAssignmentsTest.class);
private SimpleInMemoryKeyValueStorage keyValueStorage;
private ClusterService clusterService;
private MetaStorageManager metaStorageManager;
private final CatalogTableDescriptor tableDescriptor = new CatalogTableDescriptor(
1,
-1,
-1,
"table1",
0,
List.of(new CatalogTableColumnDescriptor("k1", ColumnType.INT32, false, 0, 0, 0, null)),
List.of("k1"),
null,
DEFAULT_STORAGE_PROFILE
);
private static final int partNum = 2;
private static final int replicas = 2;
private static final Set<String> nodes1 = IntStream.of(5).mapToObj(i -> "nodes1_" + i).collect(toSet());
private static final Set<String> nodes2 = IntStream.of(5).mapToObj(i -> "nodes2_" + i).collect(toSet());
private static final Set<String> nodes3 = IntStream.of(5).mapToObj(i -> "nodes3_" + i).collect(toSet());
private static final Set<String> nodes4 = IntStream.of(5).mapToObj(i -> "nodes4_" + i).collect(toSet());
private static final Set<Assignment> assignments1 = calculateAssignmentForPartition(nodes1, partNum, replicas);
private static final Set<Assignment> assignments2 = calculateAssignmentForPartition(nodes2, partNum, replicas);
private static final Set<Assignment> assignments3 = calculateAssignmentForPartition(nodes3, partNum, replicas);
private static final Set<Assignment> assignments4 = calculateAssignmentForPartition(nodes4, partNum, replicas);
@BeforeEach
public void setUp() {
clusterService = mock(ClusterService.class);
metaStorageManager = mock(MetaStorageManager.class);
AtomicLong raftIndex = new AtomicLong();
keyValueStorage = spy(new SimpleInMemoryKeyValueStorage("test"));
MetaStorageListener metaStorageListener = new MetaStorageListener(keyValueStorage, mock(ClusterTimeImpl.class));
RaftGroupService metaStorageService = mock(RaftGroupService.class);
// Delegate directly to listener.
lenient().doAnswer(
invocationClose -> {
Command cmd = invocationClose.getArgument(0);
long commandIndex = raftIndex.incrementAndGet();
if (cmd instanceof MetaStorageWriteCommand) {
((MetaStorageWriteCommand) cmd).safeTimeLong(10);
}
CompletableFuture<Serializable> res = new CompletableFuture<>();
CommandClosure<WriteCommand> clo = new CommandClosure<>() {
@Override
public long index() {
return commandIndex;
}
@Override
public WriteCommand command() {
return (WriteCommand) cmd;
}
@Override
public void result(@Nullable Serializable r) {
if (r instanceof Throwable) {
res.completeExceptionally((Throwable) r);
} else {
res.complete(r);
}
}
};
try {
metaStorageListener.onWrite(List.of(clo).iterator());
} catch (Throwable e) {
res.completeExceptionally(new IgniteInternalException(e));
}
return res;
}
).when(metaStorageService).run(any());
MetaStorageCommandsFactory commandsFactory = new MetaStorageCommandsFactory();
lenient().doAnswer(invocationClose -> {
Iif iif = invocationClose.getArgument(0);
MultiInvokeCommand multiInvokeCommand = commandsFactory.multiInvokeCommand().iif(iif).build();
return metaStorageService.run(multiInvokeCommand);
}).when(metaStorageManager).invoke(any());
when(clusterService.messagingService()).thenAnswer(invocation -> {
MessagingService ret = mock(MessagingService.class);
return ret;
});
}
@AfterEach
public void tearDown() {
keyValueStorage.close();
}
/**
* Nodes for new assignments calculating: nodes1.
* The table configuration assignments: assignments2.
* Current assignments in the metastorage: stable=null, pending=null, planned=null.
* Expected assignments in the metastorage after updating: stable=null, pending=assignments1, planned=null.
*/
@Test
void test1() {
test(
nodes1, assignments2,
null, null, null,
null, assignments1, null
);
}
/**
* Nodes for new assignments calculating: nodes1.
* The table configuration assignments: assignments1.
* Current assignments in the metastorage: stable=null, pending=null, planned=null.
* Expected assignments in the metastorage after updating: stable=null, pending=null, planned=null.
*/
@Test
void test2() {
test(
nodes1, assignments1,
null, null, null,
null, null, null
);
}
/**
* Nodes for new assignments calculating: nodes1.
* The table configuration assignments: assignments2.
* Current assignments in the metastorage: stable=null, pending=assignments3, planned=null.
* Expected assignments in the metastorage after updating: stable=null, pending=assignments3, planned=assignments1.
*/
@Test
void test3() {
test(
nodes1, assignments2,
null, assignments3, null,
null, assignments3, assignments1
);
}
/**
* Nodes for new assignments calculating: nodes1.
* The table configuration assignments: assignments1.
* Current assignments in the metastorage: stable=null, pending=assignments3, planned=null.
* Expected assignments in the metastorage after updating: stable=null, pending=assignments3, planned=assignments1.
*/
@Test
void test4() {
test(
nodes1, assignments1,
null, assignments3, null,
null, assignments3, assignments1
);
}
/**
* Nodes for new assignments calculating: nodes1.
* The table configuration assignments: assignments2.
* Current assignments in the metastorage: stable=assignments3, pending=null, planned=null.
* Expected assignments in the metastorage after updating: stable=assignments3, pending=assignments1, planned=null.
*/
@Test
void test5() {
test(
nodes1, assignments2,
assignments3, null, null,
assignments3, assignments1, null
);
}
/**
* Nodes for new assignments calculating: nodes1.
* The table configuration assignments: assignments1.
* Current assignments in the metastorage: stable=assignments3, pending=null, planned=null.
* Expected assignments in the metastorage after updating: stable=assignments3, pending=assignments1, planned=null.
*/
@Test
void test6() {
test(
nodes1, assignments1,
assignments3, null, null,
assignments3, assignments1, null
);
}
/**
* Nodes for new assignments calculating: nodes1.
* The table configuration assignments: assignments2.
* Current assignments in the metastorage: stable=assignments1, pending=null, planned=null.
* Expected assignments in the metastorage after updating: stable=assignments1, pending=null, planned=null.
*/
@Test
void test7() {
test(
nodes1, assignments2,
assignments1, null, null,
assignments1, null, null
);
}
/**
* Nodes for new assignments calculating: nodes1.
* The table configuration assignments: assignments1.
* Current assignments in the metastorage: stable=assignments1, pending=null, planned=null.
* Expected assignments in the metastorage after updating: stable=assignments1, pending=null, planned=null.
*/
@Test
void test8() {
test(
nodes1, assignments1,
assignments1, null, null,
assignments1, null, null
);
}
/**
* Nodes for new assignments calculating: nodes1.
* The table configuration assignments: assignments2.
* Current assignments in the metastorage: stable=assignments2, pending=null, planned=null.
* Expected assignments in the metastorage after updating: stable=assignments2, pending=assignments1, planned=null.
*/
@Test
void test9() {
test(
nodes1, assignments2,
assignments2, null, null,
assignments2, assignments1, null
);
}
/**
* Nodes for new assignments calculating: nodes1.
* The table configuration assignments: assignments2.
* Current assignments in the metastorage: stable=assignments4, pending=assignments3, planned=null.
* Expected assignments in the metastorage after updating: stable=assignments4, pending=assignments3, planned=assignments1.
*/
@Test
void test10() {
test(
nodes1, assignments2,
assignments4, assignments3, null,
assignments4, assignments3, assignments1
);
}
/**
* Nodes for new assignments calculating: nodes1.
* The table configuration assignments: assignments1.
* Current assignments in the metastorage: stable=assignments3, pending=assignments2, planned=null.
* Expected assignments in the metastorage after updating: stable=assignments3, pending=assignments2, planned=assignments1.
*/
@Test
void test11() {
test(
nodes1, assignments1,
assignments3, assignments2, null,
assignments3, assignments2, assignments1
);
}
/**
* Nodes for new assignments calculating: nodes1.
* The table configuration assignments: assignments2.
* Current assignments in the metastorage: stable=assignments1, pending=assignments3, planned=null.
* Expected assignments in the metastorage after updating: stable=assignments1, pending=assignments3, planned=assignments1.
*/
@Test
void test12() {
test(
nodes1, assignments2,
assignments1, assignments3, null,
assignments1, assignments3, assignments1
);
}
/**
* Nodes for new assignments calculating: nodes1.
* The table configuration assignments: assignments2.
* Current assignments in the metastorage: stable=assignments2, pending=assignments3, planned=null.
* Expected assignments in the metastorage after updating: stable=assignments2, pending=assignments3, planned=assignments1.
*/
@Test
void test13() {
test(
nodes1, assignments2,
assignments2, assignments3, null,
assignments2, assignments3, assignments1
);
}
/**
* Nodes for new assignments calculating: nodes1.
* The table configuration assignments: assignments1.
* Current assignments in the metastorage: stable=assignments1, pending=assignments2, planned=null.
* Expected assignments in the metastorage after updating: stable=assignments1, pending=assignments2, planned=assignments1.
*/
@Test
void test14() {
test(
nodes1, assignments1,
assignments1, assignments2, null,
assignments1, assignments2, assignments1
);
}
/**
* Nodes for new assignments calculating: nodes1.
* The table configuration assignments: assignments1.
* Current assignments in the metastorage: stable=assignments1, pending=assignments2, planned=assignments3.
* Expected assignments in the metastorage after updating: stable=assignments1, pending=assignments2, planned=assignments1.
*/
@Test
void test15() {
test(
nodes1, assignments1,
assignments1, assignments2, assignments3,
assignments1, assignments2, assignments1
);
}
/**
* Nodes for new assignments calculating: nodes1.
* The table configuration assignments: assignments4.
* Current assignments in the metastorage: stable=assignments1, pending=assignments2, planned=assignments1.
* Expected assignments in the metastorage after updating: stable=assignments1, pending=assignments2, planned=assignments1.
*/
@Test
void test16() {
test(
nodes1, assignments4,
assignments1, assignments2, assignments1,
assignments1, assignments2, assignments1
);
}
/**
* Nodes for new assignments calculating: nodes2.
* The table configuration assignments: assignments2.
* Current assignments in the metastorage: stable=assignments1, pending=assignments2, planned=assignments1.
* Expected assignments in the metastorage after updating: stable=assignments1, pending=assignments2, planned=null.
*/
@Test
void test17() {
test(
nodes2, assignments2,
assignments1, assignments2, assignments1,
assignments1, assignments2, null
);
}
/**
* Nodes for new assignments calculating: nodes2.
* The table configuration assignments: assignments4.
* Current assignments in the metastorage: stable=assignments1, pending=assignments2, planned=assignments1.
* Expected assignments in the metastorage after updating: stable=assignments1, pending=assignments2, planned=null.
*/
@Test
void test18() {
test(
nodes2, assignments4,
assignments1, assignments2, assignments1,
assignments1, assignments2, null
);
}
private void test(
Collection<String> nodesForNewAssignments,
Set<Assignment> tableCfgAssignments,
Set<Assignment> currentStableAssignments,
Set<Assignment> currentPendingAssignments,
Set<Assignment> currentPlannedAssignments,
Set<Assignment> expectedStableAssignments,
Set<Assignment> expectedPendingAssignments,
Set<Assignment> expectedPlannedAssignments
) {
TablePartitionId tablePartitionId = new TablePartitionId(1, 1);
if (currentStableAssignments != null) {
keyValueStorage.put(RebalanceUtil.stablePartAssignmentsKey(tablePartitionId).bytes(), toBytes(currentStableAssignments),
HybridTimestamp.MIN_VALUE);
}
if (currentPendingAssignments != null) {
keyValueStorage.put(RebalanceUtil.pendingPartAssignmentsKey(tablePartitionId).bytes(), toBytes(currentPendingAssignments),
HybridTimestamp.MIN_VALUE);
}
if (currentPlannedAssignments != null) {
keyValueStorage.put(RebalanceUtil.plannedPartAssignmentsKey(tablePartitionId).bytes(), toBytes(currentPlannedAssignments),
HybridTimestamp.MIN_VALUE);
}
RebalanceUtil.updatePendingAssignmentsKeys(
tableDescriptor, tablePartitionId, nodesForNewAssignments,
replicas, 1, metaStorageManager, partNum, tableCfgAssignments
);
byte[] actualStableBytes = keyValueStorage.get(RebalanceUtil.stablePartAssignmentsKey(tablePartitionId).bytes()).value();
Set<Assignment> actualStableAssignments = null;
if (actualStableBytes != null) {
actualStableAssignments = Assignments.fromBytes(actualStableBytes).nodes();
}
byte[] actualPendingBytes = keyValueStorage.get(RebalanceUtil.pendingPartAssignmentsKey(tablePartitionId).bytes()).value();
Set<Assignment> actualPendingAssignments = null;
if (actualPendingBytes != null) {
actualPendingAssignments = Assignments.fromBytes(actualPendingBytes).nodes();
}
byte[] actualPlannedBytes = keyValueStorage.get(RebalanceUtil.plannedPartAssignmentsKey(tablePartitionId).bytes()).value();
Set<Assignment> actualPlannedAssignments = null;
if (actualPlannedBytes != null) {
actualPlannedAssignments = Assignments.fromBytes(actualPlannedBytes).nodes();
}
LOG.info("stableAssignments " + actualStableAssignments);
LOG.info("pendingAssignments " + actualPendingAssignments);
LOG.info("plannedAssignments " + actualPlannedAssignments);
if (expectedStableAssignments != null) {
assertNotNull(actualStableBytes);
assertEquals(actualStableAssignments, expectedStableAssignments);
} else {
assertNull(actualStableBytes);
}
if (expectedPendingAssignments != null) {
assertNotNull(actualPendingBytes);
assertEquals(actualPendingAssignments, expectedPendingAssignments);
} else {
assertNull(actualPendingBytes);
}
if (expectedPlannedAssignments != null) {
assertNotNull(actualPlannedBytes);
assertEquals(actualPlannedAssignments, expectedPlannedAssignments);
} else {
assertNull(actualPlannedBytes);
}
}
}