blob: 8ac8a7156153ed3ac78ce1becb69d74e71ba5256 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.client;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import io.netty.util.ResourceLeakDetector;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.apache.ignite.Ignite;
import org.apache.ignite.client.AbstractClientTableTest.PersonPojo;
import org.apache.ignite.client.fakes.FakeIgnite;
import org.apache.ignite.client.fakes.FakeIgniteTables;
import org.apache.ignite.client.fakes.FakeInternalTable;
import org.apache.ignite.client.handler.FakePlacementDriver;
import org.apache.ignite.compute.IgniteCompute;
import org.apache.ignite.internal.client.ReliableChannel;
import org.apache.ignite.internal.client.tx.ClientLazyTransaction;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.streamer.SimplePublisher;
import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.table.DataStreamerItem;
import org.apache.ignite.table.DataStreamerOptions;
import org.apache.ignite.table.KeyValueView;
import org.apache.ignite.table.RecordView;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.table.mapper.Mapper;
import org.apache.ignite.tx.Transaction;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
/**
* Tests partition awareness.
*/
public class PartitionAwarenessTest extends AbstractClientTest {
private static final String nodeKey0 = "server-2";
private static final String nodeKey1 = "server-2";
private static final String nodeKey2 = "server-1";
private static final String nodeKey3 = "server-2";
private static TestServer testServer2;
private static Ignite server2;
private static IgniteClient client2;
private volatile @Nullable String lastOp;
private volatile @Nullable String lastOpServerName;
private static final AtomicInteger nextTableId = new AtomicInteger(101);
/**
* Before all.
*/
@BeforeAll
public static void startServer2() {
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
server2 = new FakeIgnite("server-2");
testServer2 = new TestServer(0, server2, null, null, "server-2", clusterId, null, null);
var clientBuilder = IgniteClient.builder()
.addresses("127.0.0.1:" + serverPort, "127.0.0.1:" + testServer2.port())
.heartbeatInterval(200);
client2 = clientBuilder.build();
}
/**
* After all.
*/
@AfterAll
public static void stopServer2() throws Exception {
closeAll(client2, testServer2);
}
@BeforeEach
public void initReplicas() throws InterruptedException {
dropTables(server2);
initPrimaryReplicas(null);
assertTrue(IgniteTestUtils.waitForCondition(() -> client2.connections().size() == 2, 3000));
}
@Test
public void testGetTupleRoutesRequestToPrimaryNode() {
RecordView<Tuple> recordView = defaultTable().recordView();
assertOpOnNode(nodeKey0, "get", tx -> recordView.get(tx, Tuple.create().set("ID", 0L)));
assertOpOnNode(nodeKey1, "get", tx -> recordView.get(tx, Tuple.create().set("ID", 1L)));
assertOpOnNode(nodeKey2, "get", tx -> recordView.get(tx, Tuple.create().set("ID", 2L)));
assertOpOnNode(nodeKey3, "get", tx -> recordView.get(tx, Tuple.create().set("ID", 3L)));
}
@Test
public void testGetRecordRoutesRequestToPrimaryNode() {
RecordView<PersonPojo> pojoView = defaultTable().recordView(Mapper.of(PersonPojo.class));
assertOpOnNode(nodeKey0, "get", tx -> pojoView.get(tx, new PersonPojo(0L)));
assertOpOnNode(nodeKey1, "get", tx -> pojoView.get(tx, new PersonPojo(1L)));
assertOpOnNode(nodeKey2, "get", tx -> pojoView.get(tx, new PersonPojo(2L)));
assertOpOnNode(nodeKey3, "get", tx -> pojoView.get(tx, new PersonPojo(3L)));
}
@Test
public void testGetKeyValueRoutesRequestToPrimaryNode() {
KeyValueView<Long, String> kvView = defaultTable().keyValueView(Mapper.of(Long.class), Mapper.of(String.class));
assertOpOnNode(nodeKey0, "get", tx -> kvView.get(tx, 0L));
assertOpOnNode(nodeKey1, "get", tx -> kvView.get(tx, 1L));
assertOpOnNode(nodeKey2, "get", tx -> kvView.get(tx, 2L));
assertOpOnNode(nodeKey3, "get", tx -> kvView.get(tx, 3L));
}
@Test
public void testGetKeyValueBinaryRoutesRequestToPrimaryNode() {
KeyValueView<Tuple, Tuple> kvView = defaultTable().keyValueView();
assertOpOnNode(nodeKey0, "get", tx -> kvView.get(tx, Tuple.create().set("ID", 0L)));
assertOpOnNode(nodeKey1, "get", tx -> kvView.get(tx, Tuple.create().set("ID", 1L)));
assertOpOnNode(nodeKey2, "get", tx -> kvView.get(tx, Tuple.create().set("ID", 2L)));
assertOpOnNode(nodeKey3, "get", tx -> kvView.get(tx, Tuple.create().set("ID", 3L)));
}
@Test
public void testNonNullTxDisablesPartitionAwareness() {
RecordView<Tuple> recordView = defaultTable().recordView();
var tx = (ClientLazyTransaction) client2.transactions().begin();
client2.sql().execute(tx, "SELECT 1").close(); // Force lazy tx init.
String expectedNode = tx.nodeName();
assertNotNull(expectedNode);
assertOpOnNode(expectedNode, "get", tx2 -> recordView.get(tx, Tuple.create().set("ID", 0L)));
assertOpOnNode(expectedNode, "get", tx2 -> recordView.get(tx, Tuple.create().set("ID", 1L)));
assertOpOnNode(expectedNode, "get", tx2 -> recordView.get(tx, Tuple.create().set("ID", 2L)));
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testClientReceivesPartitionAssignmentUpdates(boolean useHeartbeat) throws InterruptedException {
ReliableChannel ch = IgniteTestUtils.getFieldValue(client2, "ch");
// Check default assignment.
RecordView<Tuple> recordView = defaultTable().recordView();
assertOpOnNode(nodeKey1, "get", tx -> recordView.get(tx, Tuple.create().set("ID", 1L)));
assertOpOnNode(nodeKey2, "get", tx -> recordView.get(tx, Tuple.create().set("ID", 2L)));
// Update partition assignment.
var oldTs = ch.partitionAssignmentTimestamp();
initPrimaryReplicas(reversedReplicas());
if (useHeartbeat) {
// Wait for heartbeat message to receive change notification flag.
assertTrue(IgniteTestUtils.waitForCondition(() -> ch.partitionAssignmentTimestamp() > oldTs, 3000));
} else {
// Perform requests to receive change notification flag.
int maxRequests = 50;
while (ch.partitionAssignmentTimestamp() <= oldTs && maxRequests-- > 0) {
client2.tables().tables();
}
assertThat("Failed to receive assignment update", maxRequests, greaterThan(0));
}
// Check new assignment.
assertThat(ch.partitionAssignmentTimestamp(), greaterThan(oldTs));
assertOpOnNode(nodeKey2, "get", tx -> recordView.get(tx, Tuple.create().set("ID", 1L)));
assertOpOnNode(nodeKey1, "get", tx -> recordView.get(tx, Tuple.create().set("ID", 2L)));
}
@Test
public void testCustomColocationKey() {
RecordView<Tuple> recordView = table(FakeIgniteTables.TABLE_COLOCATION_KEY).recordView();
assertOpOnNode("server-2", "get", tx -> recordView.get(tx, Tuple.create().set("ID", 0).set("COLO-1", "0").set("COLO-2", 4L)));
assertOpOnNode("server-1", "get", tx -> recordView.get(tx, Tuple.create().set("ID", 0).set("COLO-1", "0").set("COLO-2", 8L)));
}
@Test
public void testCompositeKey() {
RecordView<Tuple> recordView = table(FakeIgniteTables.TABLE_COMPOSITE_KEY).recordView();
assertOpOnNode("server-2", "get", tx -> recordView.get(tx, Tuple.create().set("ID1", 0).set("ID2", "0")));
assertOpOnNode("server-1", "get", tx -> recordView.get(tx, Tuple.create().set("ID1", 1).set("ID2", "0")));
assertOpOnNode("server-2", "get", tx -> recordView.get(tx, Tuple.create().set("ID1", 0).set("ID2", "1")));
assertOpOnNode("server-1", "get", tx -> recordView.get(tx, Tuple.create().set("ID1", 1).set("ID2", "1")));
assertOpOnNode("server-2", "get", tx -> recordView.get(tx, Tuple.create().set("ID1", 1).set("ID2", "2")));
}
@Test
public void testAllRecordViewOperations() {
RecordView<PersonPojo> pojoView = defaultTable().recordView(
Mapper.of(PersonPojo.class));
var t1 = new PersonPojo(0L);
var t2 = new PersonPojo(1L);
assertOpOnNode(nodeKey0, "insert", tx -> pojoView.insert(tx, t1));
assertOpOnNode(nodeKey1, "insert", tx -> pojoView.insert(tx, t2));
assertOpOnNode(nodeKey0, "insertAll", tx -> pojoView.insertAll(tx, List.of(t1)));
assertOpOnNode(nodeKey1, "insertAll", tx -> pojoView.insertAll(tx, List.of(t2)));
assertOpOnNode(nodeKey0, "upsert", tx -> pojoView.upsert(tx, t1));
assertOpOnNode(nodeKey1, "upsert", tx -> pojoView.upsert(tx, t2));
assertOpOnNode(nodeKey0, "upsertAll", tx -> pojoView.upsertAll(tx, List.of(t1)));
assertOpOnNode(nodeKey1, "upsertAll", tx -> pojoView.upsertAll(tx, List.of(t2)));
assertOpOnNode(nodeKey0, "get", tx -> pojoView.get(tx, t1));
assertOpOnNode(nodeKey1, "get", tx -> pojoView.get(tx, t2));
assertOpOnNode(nodeKey0, "getAll", tx -> pojoView.getAll(tx, List.of(t1)));
assertOpOnNode(nodeKey1, "getAll", tx -> pojoView.getAll(tx, List.of(t2)));
assertOpOnNode(nodeKey0, "getAndUpsert", tx -> pojoView.getAndUpsert(tx, t1));
assertOpOnNode(nodeKey1, "getAndUpsert", tx -> pojoView.getAndUpsert(tx, t2));
assertOpOnNode(nodeKey0, "getAndReplace", tx -> pojoView.getAndReplace(tx, t1));
assertOpOnNode(nodeKey1, "getAndReplace", tx -> pojoView.getAndReplace(tx, t2));
assertOpOnNode(nodeKey0, "getAndDelete", tx -> pojoView.getAndDelete(tx, t1));
assertOpOnNode(nodeKey1, "getAndDelete", tx -> pojoView.getAndDelete(tx, t2));
assertOpOnNode(nodeKey0, "replace", tx -> pojoView.replace(tx, t1));
assertOpOnNode(nodeKey1, "replace", tx -> pojoView.replace(tx, t2));
assertOpOnNode(nodeKey0, "replace", tx -> pojoView.replace(tx, t1, t1));
assertOpOnNode(nodeKey1, "replace", tx -> pojoView.replace(tx, t2, t2));
assertOpOnNode(nodeKey0, "delete", tx -> pojoView.delete(tx, t1));
assertOpOnNode(nodeKey1, "delete", tx -> pojoView.delete(tx, t2));
assertOpOnNode(nodeKey0, "deleteExact", tx -> pojoView.deleteExact(tx, t1));
assertOpOnNode(nodeKey1, "deleteExact", tx -> pojoView.deleteExact(tx, t2));
assertOpOnNode(nodeKey0, "deleteAll", tx -> pojoView.deleteAll(tx, List.of(t1)));
assertOpOnNode(nodeKey1, "deleteAll", tx -> pojoView.deleteAll(tx, List.of(t2)));
assertOpOnNode(nodeKey0, "deleteAllExact", tx -> pojoView.deleteAllExact(tx, List.of(t1)));
assertOpOnNode(nodeKey1, "deleteAllExact", tx -> pojoView.deleteAllExact(tx, List.of(t2)));
}
@Test
public void testAllRecordBinaryViewOperations() {
RecordView<Tuple> recordView = defaultTable().recordView();
Tuple t1 = Tuple.create().set("ID", 1L);
Tuple t2 = Tuple.create().set("ID", 2L);
assertOpOnNode(nodeKey1, "insert", tx -> recordView.insert(tx, t1));
assertOpOnNode(nodeKey2, "insert", tx -> recordView.insert(tx, t2));
assertOpOnNode(nodeKey1, "insertAll", tx -> recordView.insertAll(tx, List.of(t1)));
assertOpOnNode(nodeKey2, "insertAll", tx -> recordView.insertAll(tx, List.of(t2)));
assertOpOnNode(nodeKey1, "upsert", tx -> recordView.upsert(tx, t1));
assertOpOnNode(nodeKey2, "upsert", tx -> recordView.upsert(tx, t2));
assertOpOnNode(nodeKey1, "upsertAll", tx -> recordView.upsertAll(tx, List.of(t1)));
assertOpOnNode(nodeKey2, "upsertAll", tx -> recordView.upsertAll(tx, List.of(t2)));
assertOpOnNode(nodeKey1, "get", tx -> recordView.get(tx, t1));
assertOpOnNode(nodeKey2, "get", tx -> recordView.get(tx, t2));
assertOpOnNode(nodeKey1, "getAll", tx -> recordView.getAll(tx, List.of(t1)));
assertOpOnNode(nodeKey2, "getAll", tx -> recordView.getAll(tx, List.of(t2)));
assertOpOnNode(nodeKey1, "getAndUpsert", tx -> recordView.getAndUpsert(tx, t1));
assertOpOnNode(nodeKey2, "getAndUpsert", tx -> recordView.getAndUpsert(tx, t2));
assertOpOnNode(nodeKey1, "getAndReplace", tx -> recordView.getAndReplace(tx, t1));
assertOpOnNode(nodeKey2, "getAndReplace", tx -> recordView.getAndReplace(tx, t2));
assertOpOnNode(nodeKey1, "getAndDelete", tx -> recordView.getAndDelete(tx, t1));
assertOpOnNode(nodeKey2, "getAndDelete", tx -> recordView.getAndDelete(tx, t2));
assertOpOnNode(nodeKey1, "replace", tx -> recordView.replace(tx, t1));
assertOpOnNode(nodeKey2, "replace", tx -> recordView.replace(tx, t2));
assertOpOnNode(nodeKey1, "replace", tx -> recordView.replace(tx, t1, t1));
assertOpOnNode(nodeKey2, "replace", tx -> recordView.replace(tx, t2, t2));
assertOpOnNode(nodeKey1, "delete", tx -> recordView.delete(tx, t1));
assertOpOnNode(nodeKey2, "delete", tx -> recordView.delete(tx, t2));
assertOpOnNode(nodeKey1, "deleteExact", tx -> recordView.deleteExact(tx, t1));
assertOpOnNode(nodeKey2, "deleteExact", tx -> recordView.deleteExact(tx, t2));
assertOpOnNode(nodeKey1, "deleteAll", tx -> recordView.deleteAll(tx, List.of(t1)));
assertOpOnNode(nodeKey2, "deleteAll", tx -> recordView.deleteAll(tx, List.of(t2)));
assertOpOnNode(nodeKey1, "deleteAllExact", tx -> recordView.deleteAllExact(tx, List.of(t1)));
assertOpOnNode(nodeKey2, "deleteAllExact", tx -> recordView.deleteAllExact(tx, List.of(t2)));
}
@Test
public void testAllKeyValueViewOperations() {
KeyValueView<Long, String> kvView = defaultTable().keyValueView(Mapper.of(Long.class), Mapper.of(String.class));
var k1 = 1L;
var k2 = 2L;
var v = "v";
assertOpOnNode(nodeKey1, "insert", tx -> kvView.putIfAbsent(tx, k1, v));
assertOpOnNode(nodeKey2, "insert", tx -> kvView.putIfAbsent(tx, k2, v));
assertOpOnNode(nodeKey1, "upsert", tx -> kvView.put(tx, k1, v));
assertOpOnNode(nodeKey2, "upsert", tx -> kvView.put(tx, k2, v));
assertOpOnNode(nodeKey1, "upsertAll", tx -> kvView.putAll(tx, Map.of(k1, v)));
assertOpOnNode(nodeKey2, "upsertAll", tx -> kvView.putAll(tx, Map.of(k2, v)));
assertOpOnNode(nodeKey1, "get", tx -> kvView.get(tx, k1));
assertOpOnNode(nodeKey2, "get", tx -> kvView.get(tx, k2));
assertOpOnNode(nodeKey1, "get", tx -> kvView.contains(tx, k1));
assertOpOnNode(nodeKey2, "get", tx -> kvView.contains(tx, k2));
assertOpOnNode(nodeKey1, "getAll", tx -> kvView.getAll(tx, List.of(k1)));
assertOpOnNode(nodeKey2, "getAll", tx -> kvView.getAll(tx, List.of(k2)));
assertOpOnNode(nodeKey1, "getAndUpsert", tx -> kvView.getAndPut(tx, k1, v));
assertOpOnNode(nodeKey2, "getAndUpsert", tx -> kvView.getAndPut(tx, k2, v));
assertOpOnNode(nodeKey1, "getAndReplace", tx -> kvView.getAndReplace(tx, k1, v));
assertOpOnNode(nodeKey2, "getAndReplace", tx -> kvView.getAndReplace(tx, k2, v));
assertOpOnNode(nodeKey1, "getAndDelete", tx -> kvView.getAndRemove(tx, k1));
assertOpOnNode(nodeKey2, "getAndDelete", tx -> kvView.getAndRemove(tx, k2));
assertOpOnNode(nodeKey1, "replace", tx -> kvView.replace(tx, k1, v));
assertOpOnNode(nodeKey2, "replace", tx -> kvView.replace(tx, k2, v));
assertOpOnNode(nodeKey1, "replace", tx -> kvView.replace(tx, k1, v, v));
assertOpOnNode(nodeKey2, "replace", tx -> kvView.replace(tx, k2, v, v));
assertOpOnNode(nodeKey1, "delete", tx -> kvView.remove(tx, k1));
assertOpOnNode(nodeKey2, "delete", tx -> kvView.remove(tx, k2));
assertOpOnNode(nodeKey1, "deleteExact", tx -> kvView.remove(tx, k1, v));
assertOpOnNode(nodeKey2, "deleteExact", tx -> kvView.remove(tx, k2, v));
assertOpOnNode(nodeKey1, "deleteAll", tx -> kvView.removeAll(tx, List.of(k1)));
assertOpOnNode(nodeKey2, "deleteAll", tx -> kvView.removeAll(tx, List.of(k2)));
}
@Test
public void testAllKeyValueBinaryViewOperations() {
KeyValueView<Tuple, Tuple> kvView = defaultTable().keyValueView();
Tuple t1 = Tuple.create().set("ID", 1L);
Tuple t2 = Tuple.create().set("ID", 2L);
Tuple val = Tuple.create();
assertOpOnNode(nodeKey1, "insert", tx -> kvView.putIfAbsent(tx, t1, val));
assertOpOnNode(nodeKey2, "insert", tx -> kvView.putIfAbsent(tx, t2, val));
assertOpOnNode(nodeKey1, "upsert", tx -> kvView.put(tx, t1, val));
assertOpOnNode(nodeKey2, "upsert", tx -> kvView.put(tx, t2, val));
assertOpOnNode(nodeKey1, "upsertAll", tx -> kvView.putAll(tx, Map.of(t1, val)));
assertOpOnNode(nodeKey2, "upsertAll", tx -> kvView.putAll(tx, Map.of(t2, val)));
assertOpOnNode(nodeKey1, "get", tx -> kvView.get(tx, t1));
assertOpOnNode(nodeKey2, "get", tx -> kvView.get(tx, t2));
assertOpOnNode(nodeKey1, "get", tx -> kvView.contains(tx, t1));
assertOpOnNode(nodeKey2, "get", tx -> kvView.contains(tx, t2));
assertOpOnNode(nodeKey1, "getAll", tx -> kvView.getAll(tx, List.of(t1)));
assertOpOnNode(nodeKey2, "getAll", tx -> kvView.getAll(tx, List.of(t2)));
assertOpOnNode(nodeKey1, "getAndUpsert", tx -> kvView.getAndPut(tx, t1, val));
assertOpOnNode(nodeKey2, "getAndUpsert", tx -> kvView.getAndPut(tx, t2, val));
assertOpOnNode(nodeKey1, "getAndReplace", tx -> kvView.getAndReplace(tx, t1, val));
assertOpOnNode(nodeKey2, "getAndReplace", tx -> kvView.getAndReplace(tx, t2, val));
assertOpOnNode(nodeKey1, "getAndDelete", tx -> kvView.getAndRemove(tx, t1));
assertOpOnNode(nodeKey2, "getAndDelete", tx -> kvView.getAndRemove(tx, t2));
assertOpOnNode(nodeKey1, "replace", tx -> kvView.replace(tx, t1, val));
assertOpOnNode(nodeKey2, "replace", tx -> kvView.replace(tx, t2, val));
assertOpOnNode(nodeKey1, "replace", tx -> kvView.replace(tx, t1, val, val));
assertOpOnNode(nodeKey2, "replace", tx -> kvView.replace(tx, t2, val, val));
assertOpOnNode(nodeKey1, "delete", tx -> kvView.remove(tx, t1));
assertOpOnNode(nodeKey2, "delete", tx -> kvView.remove(tx, t2));
assertOpOnNode(nodeKey1, "deleteExact", tx -> kvView.remove(tx, t1, val));
assertOpOnNode(nodeKey2, "deleteExact", tx -> kvView.remove(tx, t2, val));
assertOpOnNode(nodeKey1, "deleteAll", tx -> kvView.removeAll(tx, List.of(t1)));
assertOpOnNode(nodeKey2, "deleteAll", tx -> kvView.removeAll(tx, List.of(t2)));
}
@Test
public void testExecuteColocatedTupleKeyRoutesRequestToPrimaryNode() {
Table table = defaultTable();
Tuple t1 = Tuple.create().set("ID", 1L);
Tuple t2 = Tuple.create().set("ID", 2L);
assertThat(compute().executeColocatedAsync(table.name(), t1, List.of(), "job"), willBe(nodeKey1));
assertThat(compute().executeColocatedAsync(table.name(), t2, List.of(), "job"), willBe(nodeKey2));
}
@Test
public void testExecuteColocatedObjectKeyRoutesRequestToPrimaryNode() {
var mapper = Mapper.of(Long.class);
Table table = defaultTable();
assertThat(compute().executeColocatedAsync(table.name(), 1L, mapper, List.of(), "job"), willBe(nodeKey1));
assertThat(compute().executeColocatedAsync(table.name(), 2L, mapper, List.of(), "job"), willBe(nodeKey2));
}
@Test
public void testDataStreamerRecordBinaryView() {
RecordView<Tuple> recordView = defaultTable().recordView();
Consumer<Tuple> stream = t -> {
CompletableFuture<Void> fut;
try (SimplePublisher<Tuple> publisher = new SimplePublisher<>()) {
fut = recordView.streamData(publisher, null);
publisher.submit(t);
}
fut.join();
};
assertOpOnNode(nodeKey0, "updateAll", tx -> stream.accept(Tuple.create().set("ID", 0L)));
assertOpOnNode(nodeKey1, "updateAll", tx -> stream.accept(Tuple.create().set("ID", 1L)));
assertOpOnNode(nodeKey2, "updateAll", tx -> stream.accept(Tuple.create().set("ID", 2L)));
assertOpOnNode(nodeKey3, "updateAll", tx -> stream.accept(Tuple.create().set("ID", 3L)));
}
@Test
public void testDataStreamerRecordView() {
RecordView<PersonPojo> pojoView = defaultTable().recordView(Mapper.of(PersonPojo.class));
Consumer<PersonPojo> stream = t -> {
CompletableFuture<Void> fut;
try (SimplePublisher<PersonPojo> publisher = new SimplePublisher<>()) {
fut = pojoView.streamData(publisher, null);
publisher.submit(t);
}
fut.join();
};
assertOpOnNode(nodeKey0, "updateAll", tx -> stream.accept(new PersonPojo(0L)));
assertOpOnNode(nodeKey1, "updateAll", tx -> stream.accept(new PersonPojo(1L)));
assertOpOnNode(nodeKey2, "updateAll", tx -> stream.accept(new PersonPojo(2L)));
assertOpOnNode(nodeKey3, "updateAll", tx -> stream.accept(new PersonPojo(3L)));
}
@Test
public void testDataStreamerKeyValueBinaryView() {
KeyValueView<Tuple, Tuple> recordView = defaultTable().keyValueView();
Consumer<Tuple> stream = t -> {
CompletableFuture<Void> fut;
try (SimplePublisher<Entry<Tuple, Tuple>> publisher = new SimplePublisher<>()) {
fut = recordView.streamData(publisher, null);
publisher.submit(Map.entry(t, Tuple.create()));
}
fut.join();
};
assertOpOnNode(nodeKey0, "updateAll", tx -> stream.accept(Tuple.create().set("ID", 0L)));
assertOpOnNode(nodeKey1, "updateAll", tx -> stream.accept(Tuple.create().set("ID", 1L)));
assertOpOnNode(nodeKey2, "updateAll", tx -> stream.accept(Tuple.create().set("ID", 2L)));
assertOpOnNode(nodeKey3, "updateAll", tx -> stream.accept(Tuple.create().set("ID", 3L)));
}
@Test
public void testDataStreamerKeyValueView() {
KeyValueView<Long, String> kvView = defaultTable().keyValueView(Mapper.of(Long.class), Mapper.of(String.class));
Consumer<Long> stream = t -> {
CompletableFuture<Void> fut;
try (SimplePublisher<Entry<Long, String>> publisher = new SimplePublisher<>()) {
fut = kvView.streamData(publisher, null);
publisher.submit(Map.entry(t, t.toString()));
}
fut.join();
};
assertOpOnNode(nodeKey0, "updateAll", tx -> stream.accept(0L));
assertOpOnNode(nodeKey1, "updateAll", tx -> stream.accept(1L));
assertOpOnNode(nodeKey2, "updateAll", tx -> stream.accept(2L));
assertOpOnNode(nodeKey3, "updateAll", tx -> stream.accept(3L));
}
@Test
public void testDataStreamerReceivesPartitionAssignmentUpdates() {
DataStreamerOptions options = DataStreamerOptions.builder()
.pageSize(1)
.perPartitionParallelOperations(1)
.autoFlushFrequency(50)
.build();
CompletableFuture<Void> fut;
RecordView<Tuple> recordView = defaultTable().recordView();
try (SubmissionPublisher<DataStreamerItem<Tuple>> publisher = new SubmissionPublisher<>()) {
fut = recordView.streamData(publisher, options);
Consumer<Long> submit = id -> {
try {
lastOpServerName = null;
publisher.submit(DataStreamerItem.of(Tuple.create().set("ID", id)));
assertTrue(IgniteTestUtils.waitForCondition(() -> lastOpServerName != null, 1000));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
};
assertOpOnNode(nodeKey1, "updateAll", tx -> submit.accept(1L));
assertOpOnNode(nodeKey2, "updateAll", tx -> submit.accept(2L));
// Update partition assignment.
initPrimaryReplicas(reversedReplicas());
// Send some batches so that the client receives updated assignment.
for (long i = 0; i < 10; i++) {
submit.accept(i);
}
// Check updated assignment.
assertOpOnNode(nodeKey2, "updateAll", tx -> submit.accept(1L));
assertOpOnNode(nodeKey1, "updateAll", tx -> submit.accept(2L));
}
fut.join();
}
private void assertOpOnNode(String expectedNode, String expectedOp, Consumer<Transaction> op) {
assertOpOnNodeNoTx(expectedNode, expectedOp, op);
assertOpOnNodeWithTx(expectedNode, expectedOp, op);
}
private void assertOpOnNodeNoTx(String expectedNode, String expectedOp, Consumer<Transaction> op) {
lastOpServerName = null;
lastOp = null;
op.accept(null);
assertEquals(expectedOp, lastOp);
assertEquals(expectedNode, lastOpServerName, "Operation " + expectedOp + " was not executed on expected node");
}
private void assertOpOnNodeWithTx(String expectedNode, String expectedOp, Consumer<Transaction> op) {
lastOpServerName = null;
lastOp = null;
Transaction tx = client.transactions().begin();
op.accept(null);
tx.commit();
assertEquals(expectedOp, lastOp);
assertEquals(expectedNode, lastOpServerName, "Operation " + expectedOp + " was not executed on expected node with transaction");
}
private Table defaultTable() {
return table(DEFAULT_TABLE);
}
private Table table(String name) {
// Create table on both servers with the same ID.
int tableId = nextTableId.getAndIncrement();
createTable(server, tableId, name);
createTable(server2, tableId, name);
return client2.tables().table(name);
}
private static IgniteCompute compute() {
return client2.compute();
}
private void createTable(Ignite ignite, int id, String name) {
FakeIgniteTables tables = (FakeIgniteTables) ignite.tables();
TableViewInternal tableView = tables.createTable(name, id);
((FakeInternalTable) tableView.internalTable()).setDataAccessListener((op, data) -> {
lastOp = op;
lastOpServerName = ignite.name();
});
}
private static void initPrimaryReplicas(@Nullable List<String> replicas) {
long leaseStartTime = new HybridClockImpl().nowLong();
initPrimaryReplicas(testServer.placementDriver(), replicas, leaseStartTime);
initPrimaryReplicas(testServer2.placementDriver(), replicas, leaseStartTime);
}
private static void initPrimaryReplicas(FakePlacementDriver placementDriver, @Nullable List<String> replicas, long leaseStartTime) {
if (replicas == null) {
replicas = defaultReplicas();
}
placementDriver.setReplicas(replicas, nextTableId.get() - 1, leaseStartTime);
}
private static List<String> defaultReplicas() {
return List.of(testServer.nodeName(), testServer2.nodeName(), testServer.nodeName(), testServer2.nodeName());
}
private static List<String> reversedReplicas() {
return List.of(testServer2.nodeName(), testServer.nodeName(), testServer2.nodeName(), testServer.nodeName());
}
}