| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.client; |
| |
| import static org.apache.ignite.internal.hlc.HybridTimestamp.LOGICAL_TIME_BITS_SIZE; |
| import static org.apache.ignite.internal.testframework.IgniteTestUtils.await; |
| import static org.apache.ignite.internal.util.IgniteUtils.closeAll; |
| import static org.junit.jupiter.api.Assertions.assertEquals; |
| import static org.junit.jupiter.api.Assertions.assertNotNull; |
| import static org.junit.jupiter.api.Assertions.assertNull; |
| |
| import java.util.UUID; |
| import java.util.concurrent.atomic.AtomicLong; |
| import org.apache.ignite.client.fakes.FakeIgnite; |
| import org.apache.ignite.internal.TestHybridClock; |
| import org.apache.ignite.internal.client.ReliableChannel; |
| import org.apache.ignite.internal.client.tx.ClientLazyTransaction; |
| import org.apache.ignite.internal.hlc.HybridTimestamp; |
| import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; |
| import org.apache.ignite.internal.testframework.IgniteTestUtils; |
| import org.apache.ignite.sql.Statement; |
| import org.apache.ignite.sql.async.AsyncResultSet; |
| import org.apache.ignite.tx.TransactionOptions; |
| import org.jetbrains.annotations.Nullable; |
| import org.junit.jupiter.api.AfterAll; |
| import org.junit.jupiter.api.BeforeAll; |
| import org.junit.jupiter.api.Test; |
| |
| /** |
| * Tests that observable timestamp (causality token) is propagated from server to client and back. |
| */ |
| public class ObservableTimestampPropagationTest extends BaseIgniteAbstractTest { |
| private static TestServer testServer; |
| |
| private static FakeIgnite ignite; |
| |
| private static IgniteClient client; |
| |
| private static final AtomicLong currentServerTimestamp = new AtomicLong(1); |
| |
| @BeforeAll |
| public static void startServer2() { |
| TestHybridClock clock = new TestHybridClock(currentServerTimestamp::get); |
| |
| ignite = new FakeIgnite("server-2"); |
| testServer = new TestServer(0, ignite, null, null, "server-2", UUID.randomUUID(), null, null, clock); |
| |
| client = IgniteClient.builder().addresses("127.0.0.1:" + testServer.port()).build(); |
| } |
| |
| @AfterAll |
| public static void stopServer2() throws Exception { |
| closeAll(client, testServer, ignite); |
| } |
| |
| @Test |
| @SuppressWarnings("resource") |
| public void testClientPropagatesLatestKnownHybridTimestamp() { |
| ReliableChannel ch = IgniteTestUtils.getFieldValue(client, "ch"); |
| TransactionOptions roOpts = new TransactionOptions().readOnly(true); |
| |
| assertNull(lastObservableTimestamp()); |
| |
| // RW TX does not propagate timestamp. |
| var rwTx = client.transactions().begin(); |
| ClientLazyTransaction.ensureStarted(rwTx, ch, null).join(); |
| assertNull(lastObservableTimestamp()); |
| |
| // RO TX propagates timestamp. |
| var roTx = client.transactions().begin(roOpts); |
| ClientLazyTransaction.ensureStarted(roTx, ch, null).join(); |
| assertEquals(1, lastObservableTimestamp()); |
| |
| // Increase timestamp on server - client does not know about it initially. |
| currentServerTimestamp.set(11); |
| ClientLazyTransaction.ensureStarted(client.transactions().begin(roOpts), ch, null).join(); |
| assertEquals(1, lastObservableTimestamp()); |
| |
| // Subsequent RO TX propagates latest known timestamp. |
| client.tables().tables(); |
| ClientLazyTransaction.ensureStarted(client.transactions().begin(roOpts), ch, null).join(); |
| assertEquals(11, lastObservableTimestamp()); |
| |
| // Smaller timestamp from server is ignored by client. |
| currentServerTimestamp.set(9); |
| ClientLazyTransaction.ensureStarted(client.transactions().begin(roOpts), ch, null).join(); |
| ClientLazyTransaction.ensureStarted(client.transactions().begin(roOpts), ch, null).join(); |
| assertEquals(11, lastObservableTimestamp()); |
| |
| Statement statement = client.sql().statementBuilder() |
| .query("SELECT 1") |
| .build(); |
| |
| // Execution of a SQL query should propagate observable time, not the current time of the clock. |
| currentServerTimestamp.set(20); |
| updateObservableTimestamp(14); |
| AsyncResultSet<?> rs = await(client.sql().executeAsync(null, statement)); |
| assertEquals(14, lastObservableTimestamp()); |
| |
| assertNotNull(rs); |
| |
| // Every fetch should propagate observable time, not the current time of the clock. |
| currentServerTimestamp.set(20); |
| updateObservableTimestamp(18); |
| await(rs.fetchNextPage()); |
| assertEquals(18, lastObservableTimestamp()); |
| |
| currentServerTimestamp.set(24); |
| updateObservableTimestamp(20); |
| await(rs.fetchNextPage()); |
| assertEquals(20, lastObservableTimestamp()); |
| |
| // Closing a result set should propagate observable time as well. |
| updateObservableTimestamp(22); |
| await(rs.closeAsync()); |
| assertEquals(22, lastObservableTimestamp()); |
| } |
| |
| private static @Nullable Long lastObservableTimestamp() { |
| HybridTimestamp ts = ignite.timestampTracker().get(); |
| |
| return ts == null ? null : ts.longValue() >> LOGICAL_TIME_BITS_SIZE; |
| } |
| |
| private static void updateObservableTimestamp(long newTime) { |
| ignite.timestampTracker().update(HybridTimestamp.hybridTimestamp(newTime << LOGICAL_TIME_BITS_SIZE)); |
| } |
| } |