blob: 36e61a1d8721e82bc6a04dd608ca93bcee62616b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.table;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toMap;
import static org.apache.ignite.internal.replicator.ReplicatorConstants.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS;
import static org.apache.ignite.internal.schema.SchemaTestUtils.specToType;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.params.ParameterizedTest.ARGUMENTS_PLACEHOLDER;
import static org.mockito.Answers.RETURNS_DEEP_STUBS;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import it.unimi.dsi.fastutil.ints.Int2IntMap;
import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.TestClockService;
import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.lowwatermark.TestLowWatermark;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
import org.apache.ignite.internal.replicator.message.SchemaVersionAwareReplicaRequest;
import org.apache.ignite.internal.schema.BinaryRowEx;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.NullBinaryRow;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.schema.marshaller.TupleMarshallerException;
import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
import org.apache.ignite.internal.table.distributed.command.TimedBinaryRowMessage;
import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand;
import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteMultiRowReplicaRequest;
import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowReplicaRequest;
import org.apache.ignite.internal.table.distributed.schema.ConstantSchemaVersions;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
import org.apache.ignite.internal.table.distributed.storage.TableRaftServiceImpl;
import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
import org.apache.ignite.internal.tx.storage.state.test.TestTxStateTableStorage;
import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
import org.apache.ignite.internal.tx.test.TestTransactionIds;
import org.apache.ignite.internal.type.NativeTypeSpec;
import org.apache.ignite.internal.type.NativeTypes;
import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.SingleClusterNodeResolver;
import org.apache.ignite.sql.IgniteSql;
import org.apache.ignite.table.Tuple;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
/**
* Tests for data colocation.
*/
@ExtendWith(ConfigurationExtension.class)
public class ItColocationTest extends BaseIgniteAbstractTest {
/** Partitions count. */
private static final int PARTS = 32;
/** Keys count to check. */
private static final int KEYS = 100;
private static final HybridTimestampTracker observableTimestampTracker = new HybridTimestampTracker();
/** Dummy internal table for tests. */
private static InternalTable intTable;
private static TxManager txManager;
/** Map of the Raft commands are set by table operation. */
private static final Int2ObjectMap<Set<Command>> CMDS_MAP = new Int2ObjectOpenHashMap<>();
/** Message factory to create messages - RAFT commands. */
private static final TableMessagesFactory MSG_FACTORY = new TableMessagesFactory();
@InjectConfiguration
private static TransactionConfiguration txConfiguration;
private SchemaDescriptor schema;
private SchemaRegistry schemaRegistry;
private TableViewInternal tbl;
private TupleMarshallerImpl marshaller;
@BeforeAll
static void beforeAllTests() {
ClusterNode clusterNode = DummyInternalTableImpl.LOCAL_NODE;
ClusterService clusterService = mock(ClusterService.class, RETURNS_DEEP_STUBS);
when(clusterService.messagingService()).thenReturn(mock(MessagingService.class));
when(clusterService.topologyService().localMember()).thenReturn(clusterNode);
ReplicaService replicaService = mock(ReplicaService.class, RETURNS_DEEP_STUBS);
RemotelyTriggeredResourceRegistry resourcesRegistry = new RemotelyTriggeredResourceRegistry();
PlacementDriver placementDriver = new TestPlacementDriver(clusterNode);
HybridClock clock = new HybridClockImpl();
ClockService clockService = new TestClockService(clock);
TransactionInflights transactionInflights = new TransactionInflights(placementDriver, clockService);
txManager = new TxManagerImpl(
txConfiguration,
clusterService,
replicaService,
new HeapLockManager(),
clockService,
new TransactionIdGenerator(0xdeadbeef),
placementDriver,
() -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS,
new TestLocalRwTxCounter(),
resourcesRegistry,
transactionInflights,
new TestLowWatermark()
) {
@Override
public CompletableFuture<Void> finish(
HybridTimestampTracker observableTimestampTracker,
TablePartitionId commitPartition,
boolean commitIntent,
Map<TablePartitionId, IgniteBiTuple<ClusterNode, Long>> enlistedGroups,
UUID txId
) {
return nullCompletedFuture();
}
};
assertThat(txManager.startAsync(), willCompleteSuccessfully());
Int2ObjectMap<RaftGroupService> partRafts = new Int2ObjectOpenHashMap<>();
Map<ReplicationGroupId, RaftGroupService> groupRafts = new HashMap<>();
int tblId = 1;
for (int i = 0; i < PARTS; ++i) {
RaftGroupService r = mock(RaftGroupService.class);
final int part = i;
doAnswer(invocation -> {
Command cmd = (Command) invocation.getArguments()[0];
CMDS_MAP.merge(part, new HashSet<>(Set.of(cmd)), (newSet, set) -> {
set.addAll(newSet);
return set;
});
if (cmd instanceof UpdateAllCommand) {
return completedFuture(((UpdateAllCommand) cmd).rowsToUpdate().keySet().stream()
.map(uuid -> new NullBinaryRow())
.collect(Collectors.toList()));
} else {
return trueCompletedFuture();
}
}).when(r).run(any());
partRafts.put(i, r);
groupRafts.put(new TablePartitionId(tblId, i), r);
}
when(replicaService.invoke(any(ClusterNode.class), any())).thenAnswer(invocation -> {
ClusterNode node = invocation.getArgument(0);
ReplicaRequest request = invocation.getArgument(1);
var commitPartId = new TablePartitionId(2, 0);
RaftGroupService r = groupRafts.get(request.groupId());
if (request instanceof ReadWriteMultiRowReplicaRequest) {
ReadWriteMultiRowReplicaRequest multiRowReplicaRequest = (ReadWriteMultiRowReplicaRequest) request;
Map<UUID, TimedBinaryRowMessage> rows = multiRowReplicaRequest.binaryTuples().stream()
.collect(
toMap(tupleBuffer -> TestTransactionIds.newTransactionId(),
tupleBuffer -> MSG_FACTORY.timedBinaryRowMessage()
.binaryRowMessage(binaryRowMessage(tupleBuffer, multiRowReplicaRequest))
.build())
);
return r.run(MSG_FACTORY.updateAllCommand()
.tablePartitionId(MSG_FACTORY.tablePartitionIdMessage()
.tableId(commitPartId.tableId())
.partitionId(commitPartId.partitionId())
.build()
)
.messageRowsToUpdate(rows)
.txId(UUID.randomUUID())
.txCoordinatorId(node.id())
.build());
} else {
assertThat(request, is(instanceOf(ReadWriteSingleRowReplicaRequest.class)));
ReadWriteSingleRowReplicaRequest singleRowReplicaRequest = (ReadWriteSingleRowReplicaRequest) request;
return r.run(MSG_FACTORY.updateCommand()
.tablePartitionId(
MSG_FACTORY.tablePartitionIdMessage()
.tableId(commitPartId.tableId())
.partitionId(commitPartId.partitionId())
.build()
)
.rowUuid(UUID.randomUUID())
.messageRowToUpdate(MSG_FACTORY.timedBinaryRowMessage()
.binaryRowMessage(binaryRowMessage(singleRowReplicaRequest.binaryTuple(), singleRowReplicaRequest))
.build())
.txId(TestTransactionIds.newTransactionId())
.txCoordinatorId(node.id())
.build());
}
});
intTable = new InternalTableImpl(
"PUBLIC.TEST",
tblId,
PARTS,
new SingleClusterNodeResolver(clusterNode),
txManager,
mock(MvTableStorage.class),
new TestTxStateTableStorage(),
replicaService,
new HybridClockImpl(),
observableTimestampTracker,
new TestPlacementDriver(clusterNode),
new TableRaftServiceImpl("PUBLIC.TEST", PARTS, partRafts, new SingleClusterNodeResolver(clusterNode)),
transactionInflights,
3_000,
0,
null
);
}
private static BinaryRowMessage binaryRowMessage(ByteBuffer tupleBuffer, SchemaVersionAwareReplicaRequest request) {
return MSG_FACTORY.binaryRowMessage()
.schemaVersion(request.schemaVersion())
.binaryTuple(tupleBuffer)
.build();
}
@AfterAll
static void afterAllTests() {
if (txManager != null) {
assertThat(txManager.stopAsync(), willCompleteSuccessfully());
}
}
@BeforeEach
public void beforeTest() {
CMDS_MAP.clear();
}
private static Object generateValueByType(int i, NativeTypeSpec type) {
switch (type) {
case BOOLEAN:
return i % 2 == 0;
case INT8:
return (byte) i;
case INT16:
return (short) i;
case INT32:
return i;
case INT64:
return (long) i;
case FLOAT:
return (float) i + ((float) i / 1000);
case DOUBLE:
return (double) i + ((double) i / 1000);
case DECIMAL:
return BigDecimal.valueOf((double) i + ((double) i / 1000));
case UUID:
return new UUID(i, i);
case STRING:
return "str_" + i;
case BYTES:
return new byte[]{(byte) i, (byte) (i + 1), (byte) (i + 2)};
case BITMASK:
return BitSet.valueOf(new byte[]{(byte) i, (byte) (i + 1)});
case NUMBER:
return BigInteger.valueOf(i);
case DATE:
return LocalDate.of(2022, 01, 01).plusDays(i);
case TIME:
return LocalTime.of(0, 00, 00).plusSeconds(i);
case DATETIME:
return LocalDateTime.of(
(LocalDate) generateValueByType(i, NativeTypeSpec.DATE),
(LocalTime) generateValueByType(i, NativeTypeSpec.TIME)
);
case TIMESTAMP:
return ((LocalDateTime) generateValueByType(i, NativeTypeSpec.DATETIME))
.atZone(ZoneId.systemDefault())
.toInstant();
default:
throw new IllegalStateException("Unexpected type: " + type);
}
}
private static Stream<Arguments> twoColumnsParameters() {
List<Arguments> args = new ArrayList<>();
for (NativeTypeSpec t0 : NativeTypeSpec.values()) {
for (NativeTypeSpec t1 : NativeTypeSpec.values()) {
args.add(Arguments.of(t0, t1));
}
}
return args.stream();
}
/**
* Check colocation by two columns for all types.
*/
@ParameterizedTest(name = "types=" + ARGUMENTS_PLACEHOLDER)
@MethodSource("twoColumnsParameters")
public void colocationTwoColumnsInsert(NativeTypeSpec t0, NativeTypeSpec t1)
throws TupleMarshallerException {
init(t0, t1);
for (int i = 0; i < KEYS; ++i) {
CMDS_MAP.clear();
Tuple t = createTuple(i, t0, t1);
tbl.recordView().insert(null, t);
BinaryRowEx r = marshaller.marshal(t);
int part = intTable.partition(r);
assertThat(CollectionUtils.first(CMDS_MAP.get(part)), is(instanceOf(UpdateCommand.class)));
}
}
/**
* Check colocation by two columns for all types.
*/
@ParameterizedTest(name = "types=" + ARGUMENTS_PLACEHOLDER)
@MethodSource("twoColumnsParameters")
public void colocationTwoColumnsInsertAll(NativeTypeSpec t0, NativeTypeSpec t1)
throws TupleMarshallerException {
int keysCount = t0 == NativeTypeSpec.BOOLEAN && t0 == t1 ? 2 : KEYS;
init(t0, t1);
tbl.recordView().insertAll(null, IntStream.range(0, keysCount).mapToObj(i -> createTuple(i, t0, t1)).collect(Collectors.toSet()));
Int2IntMap partsMap = new Int2IntOpenHashMap();
for (int i = 0; i < keysCount; ++i) {
Tuple t = createTuple(i, t0, t1);
BinaryRowEx r = marshaller.marshal(t);
int part = intTable.partition(r);
partsMap.merge(part, 1, (cnt, ignore) -> ++cnt);
}
assertEquals(CMDS_MAP.size(), partsMap.size());
CMDS_MAP.forEach((p, set) -> {
UpdateAllCommand cmd = (UpdateAllCommand) CollectionUtils.first(set);
assertEquals(partsMap.get(p), cmd.rowsToUpdate().size(), () -> "part=" + p + ", set=" + set);
cmd.rowsToUpdate().values().forEach(rowMessage -> {
Row r = Row.wrapBinaryRow(schema, rowMessage.binaryRow());
assertEquals(intTable.partition(r), p);
});
});
}
private void init(NativeTypeSpec t0, NativeTypeSpec t1) {
schema = new SchemaDescriptor(1,
List.of(
new Column("ID", NativeTypes.INT64, false),
new Column("ID0", specToType(t0), false),
new Column("ID1", specToType(t1), false),
new Column("VAL", NativeTypes.INT64, true)
),
List.of("ID", "ID0", "ID1"),
List.of("ID1", "ID0")
);
schemaRegistry = new DummySchemaManagerImpl(schema);
tbl = new TableImpl(intTable, schemaRegistry, new HeapLockManager(), new ConstantSchemaVersions(1), mock(IgniteSql.class), -1);
marshaller = new TupleMarshallerImpl(schema);
}
private Tuple createTuple(int k, NativeTypeSpec t0, NativeTypeSpec t1) {
return Tuple.create()
.set("ID", 1L)
.set("ID0", generateValueByType(k, t0))
.set("ID1", generateValueByType(k, t1))
.set("VAL", 0L);
}
}