blob: 18f70582f7c01e64649eeaa0490d5edee8344569 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.omid.tso.client;
import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import org.apache.omid.TestUtils;
import org.apache.omid.committable.CommitTable;
import org.apache.omid.proto.TSOProto;
import org.apache.omid.tso.PausableTimestampOracle;
import org.apache.omid.tso.TSOMockModule;
import org.apache.omid.tso.TSOServer;
import org.apache.omid.tso.TSOServerConfig;
import org.apache.omid.tso.TimestampOracle;
import org.apache.omid.tso.util.DummyCellIdImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
public class TestTSOClientRequestAndResponseBehaviours {
private static final Logger LOG = LoggerFactory.getLogger(TestTSOClientRequestAndResponseBehaviours.class);
private static final String TSO_SERVER_HOST = "localhost";
private static final int TSO_SERVER_PORT = 1234;
private final static CellId c1 = new DummyCellIdImpl(0xdeadbeefL);
private final static CellId c2 = new DummyCellIdImpl(0xfeedcafeL);
private final static Set<CellId> testWriteSet = Sets.newHashSet(c1, c2);
private OmidClientConfiguration tsoClientConf;
// Required infrastructure for TSOClient test
private TSOServer tsoServer;
private PausableTimestampOracle pausableTSOracle;
private CommitTable commitTable;
@BeforeMethod
public void beforeMethod() throws Exception {
TSOServerConfig tsoConfig = new TSOServerConfig();
tsoConfig.setConflictMapSize(1000);
tsoConfig.setPort(TSO_SERVER_PORT);
tsoConfig.setNumConcurrentCTWriters(2);
Module tsoServerMockModule = new TSOMockModule(tsoConfig);
Injector injector = Guice.createInjector(tsoServerMockModule);
LOG.info("==================================================================================================");
LOG.info("======================================= Init TSO Server ==========================================");
LOG.info("==================================================================================================");
tsoServer = injector.getInstance(TSOServer.class);
tsoServer.startAsync();
tsoServer.awaitRunning();
TestUtils.waitForSocketListening(TSO_SERVER_HOST, TSO_SERVER_PORT, 100);
LOG.info("==================================================================================================");
LOG.info("===================================== TSO Server Initialized =====================================");
LOG.info("==================================================================================================");
pausableTSOracle = (PausableTimestampOracle) injector.getInstance(TimestampOracle.class);
commitTable = injector.getInstance(CommitTable.class);
OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
tsoClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
this.tsoClientConf = tsoClientConf;
}
@AfterMethod
public void afterMethod() throws Exception {
tsoServer.stopAsync();
tsoServer.awaitTerminated();
tsoServer = null;
TestUtils.waitForSocketNotListening(TSO_SERVER_HOST, TSO_SERVER_PORT, 1000);
pausableTSOracle.resume();
}
/**
* Test to ensure TSOClient timeouts are cancelled.
* At some point a bug was detected because the TSOClient timeouts were not cancelled, and as timestamp requests
* had no way to be correlated to timestamp responses, random requests were just timed out after a certain time.
* We send a lot of timestamp requests, and wait for them to complete.
* Ensure that the next request doesn't get hit by the timeouts of the previous
* requests. (i.e. make sure we cancel timeouts)
*/
@Test(timeOut = 30_000)
public void testTimeoutsAreCancelled() throws Exception {
TSOClient client = TSOClient.newInstance(tsoClientConf);
int requestTimeoutInMs = 500;
int requestMaxRetries = 5;
LOG.info("Request timeout {} ms; Max retries {}", requestTimeoutInMs, requestMaxRetries);
Future<Long> f = null;
for (int i = 0; i < (requestMaxRetries * 10); i++) {
f = client.getNewStartTimestamp();
}
if (f != null) {
f.get();
}
pausableTSOracle.pause();
long msToSleep = ((long) (requestTimeoutInMs * 0.75));
LOG.info("Sleeping for {} ms", msToSleep);
TimeUnit.MILLISECONDS.sleep(msToSleep);
f = client.getNewStartTimestamp();
msToSleep = ((long) (requestTimeoutInMs * 0.9));
LOG.info("Sleeping for {} ms", msToSleep);
TimeUnit.MILLISECONDS.sleep(msToSleep);
LOG.info("Resuming");
pausableTSOracle.resume();
f.get();
}
@Test(timeOut = 30_000)
public void testCommitGetsServiceUnavailableExceptionWhenCommunicationFails() throws Exception {
OmidClientConfiguration testTSOClientConf = new OmidClientConfiguration();
testTSOClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
testTSOClientConf.setRequestMaxRetries(0);
TSOClient client = TSOClient.newInstance(testTSOClientConf);
List<Long> startTimestamps = new ArrayList<>();
for (int i = 0; i < 10; i++) {
startTimestamps.add(client.getNewStartTimestamp().get());
}
pausableTSOracle.pause();
List<Future<Long>> futures = new ArrayList<>();
for (long s : startTimestamps) {
futures.add(client.commit(s, Sets.<CellId>newHashSet()));
}
TSOClientAccessor.closeChannel(client);
for (Future<Long> f : futures) {
try {
f.get();
fail("Shouldn't be able to complete");
} catch (ExecutionException ee) {
assertTrue(ee.getCause() instanceof ServiceUnavailableException,
"Should be a service unavailable exception");
}
}
}
/**
* Test that if a client tries to make a request without handshaking, it will be disconnected.
*/
@Test(timeOut = 30_000)
public void testHandshakeBetweenOldClientAndCurrentServer() throws Exception {
TSOClientRaw raw = new TSOClientRaw(TSO_SERVER_HOST, TSO_SERVER_PORT);
TSOProto.Request request = TSOProto.Request.newBuilder()
.setTimestampRequest(TSOProto.TimestampRequest.newBuilder().build())
.build();
raw.write(request);
try {
raw.getResponse().get();
fail("Channel should be closed");
} catch (ExecutionException ee) {
assertEquals(ee.getCause().getClass(), ConnectionException.class, "Should be channel closed exception");
}
raw.close();
}
// ----------------------------------------------------------------------------------------------------------------
// Test duplicate commits
// ----------------------------------------------------------------------------------------------------------------
/**
* This tests the case where messages arrive at the TSO out of order. This can happen in the case
* the channel get dropped and the retry is done in a new channel. However, the TSO will respond with
* aborted to the original message because the retry was already committed and it would be prohibitively
* expensive to check all non-retry requests to see if they are already committed. For this reason
* a client must ensure that if it is sending a retry due to a socket error, the previous channel
* must be entirely closed so that it will not actually receive the abort response. TCP guarantees
* that this doesn't happen in non-socket error cases.
*
*/
@Test(timeOut = 30_000)
public void testOutOfOrderMessages() throws Exception {
TSOClient client = TSOClient.newInstance(tsoClientConf);
TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
long ts1 = client.getNewStartTimestamp().get();
TSOProto.Response response1 = clientOneShot.makeRequest(createCommitRequest(ts1, true, testWriteSet));
TSOProto.Response response2 = clientOneShot.makeRequest(createCommitRequest(ts1, false, testWriteSet));
assertFalse(response1.getCommitResponse().getAborted(), "Retry Transaction should commit");
assertTrue(response2.getCommitResponse().getAborted(), "Transaction should abort");
}
@Test(timeOut = 30_000)
public void testDuplicateCommitAborting() throws Exception {
TSOClient client = TSOClient.newInstance(tsoClientConf);
TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
long ts1 = client.getNewStartTimestamp().get();
long ts2 = client.getNewStartTimestamp().get();
client.commit(ts2, testWriteSet).get();
TSOProto.Response response1 = clientOneShot.makeRequest(createCommitRequest(ts1, false, testWriteSet));
TSOProto.Response response2 = clientOneShot.makeRequest(createCommitRequest(ts1, true, testWriteSet));
assertTrue(response1.getCommitResponse().getAborted(), "Transaction should abort");
assertTrue(response2.getCommitResponse().getAborted(), "Retry commit should abort");
}
@Test(timeOut = 30_000)
public void testDuplicateCommit() throws Exception {
TSOClient client = TSOClient.newInstance(tsoClientConf);
TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
long ts1 = client.getNewStartTimestamp().get();
TSOProto.Response response1 = clientOneShot.makeRequest(createCommitRequest(ts1, false, testWriteSet));
TSOProto.Response response2 = clientOneShot.makeRequest(createCommitRequest(ts1, true, testWriteSet));
if (client.isLowLatency()) {
assertTrue(response1.hasCommitResponse());
assertTrue(response2.getCommitResponse().getAborted());
} else
assertEquals(response2.getCommitResponse().getCommitTimestamp(),
response1.getCommitResponse().getCommitTimestamp(),
"Commit timestamp should be the same");
}
// ----------------------------------------------------------------------------------------------------------------
// Test TSOClient retry behaviour
// ----------------------------------------------------------------------------------------------------------------
@Test(timeOut = 30_000)
public void testCommitCanSucceedWhenChannelDisconnected() throws Exception {
TSOClient client = TSOClient.newInstance(tsoClientConf);
long ts1 = client.getNewStartTimestamp().get();
if(client.isLowLatency())
return;
pausableTSOracle.pause();
TSOFuture<Long> future = client.commit(ts1, testWriteSet);
TSOClientAccessor.closeChannel(client);
pausableTSOracle.resume();
future.get();
}
@Test(timeOut = 30_000)
public void testCommitCanSucceedWithMultipleTimeouts() throws Exception {
OmidClientConfiguration testTSOClientConf = new OmidClientConfiguration();
testTSOClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
testTSOClientConf.setRequestTimeoutInMs(100);
testTSOClientConf.setRequestMaxRetries(10000);
TSOClient client = TSOClient.newInstance(testTSOClientConf);
long ts1 = client.getNewStartTimestamp().get();
pausableTSOracle.pause();
TSOFuture<Long> future = client.commit(ts1, testWriteSet);
TimeUnit.SECONDS.sleep(1);
pausableTSOracle.resume();
future.get();
}
@Test(timeOut = 30_000)
public void testCommitFailWhenTSOIsDown() throws Exception {
OmidClientConfiguration testTSOClientConf = new OmidClientConfiguration();
testTSOClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
testTSOClientConf.setRequestTimeoutInMs(100);
testTSOClientConf.setRequestMaxRetries(10);
TSOClient client = TSOClient.newInstance(testTSOClientConf);
long ts1 = client.getNewStartTimestamp().get();
pausableTSOracle.pause();
TSOFuture<Long> future = client.commit(ts1, testWriteSet);
try {
future.get();
} catch (ExecutionException e) {
assertEquals(e.getCause().getClass(), ServiceUnavailableException.class,
"Should be a ServiceUnavailableExeption");
}
}
@Test(timeOut = 30_000)
public void testTimestampRequestSucceedWithMultipleTimeouts() throws Exception {
OmidClientConfiguration testTSOClientConf = new OmidClientConfiguration();
testTSOClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
testTSOClientConf.setRequestTimeoutInMs(100);
testTSOClientConf.setRequestMaxRetries(10000);
TSOClient client = TSOClient.newInstance(testTSOClientConf);
pausableTSOracle.pause();
Future<Long> future = client.getNewStartTimestamp();
TimeUnit.SECONDS.sleep(1);
pausableTSOracle.resume();
future.get();
}
// ----------------------------------------------------------------------------------------------------------------
// The next 3 tests are similar to the ones in TestRetryProcessor but checking the result on the TSOClient side
// (They exercise the communication protocol) TODO Remove???
// ----------------------------------------------------------------------------------------------------------------
@Test(timeOut = 30_000)
public void testCommitTimestampPresentInCommitTableReturnsCommit() throws Exception {
TSOClient client = TSOClient.newInstance(tsoClientConf);
TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
long tx1ST = client.getNewStartTimestamp().get();
clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
TSOProto.Response response = clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
if (client.isLowLatency())
assertTrue(response.getCommitResponse().getAborted(), "Transaction should be aborted");
else {
assertFalse(response.getCommitResponse().getAborted(), "Transaction should be committed");
assertEquals(response.getCommitResponse().getCommitTimestamp(),
tx1ST + CommitTable.MAX_CHECKPOINTS_PER_TXN);
}
}
@Test(timeOut = 30_000)
public void testInvalidCommitTimestampPresentInCommitTableReturnsAbort() throws Exception {
TSOClient client = TSOClient.newInstance(tsoClientConf);
TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
long tx1ST = client.getNewStartTimestamp().get();
// Invalidate the transaction
commitTable.getClient().tryInvalidateTransaction(tx1ST);
clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
TSOProto.Response response = clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
assertTrue(response.getCommitResponse().getAborted(), "Transaction should be aborted");
assertEquals(response.getCommitResponse().getCommitTimestamp(), 0);
}
@Test(timeOut = 30_000)
public void testCommitTimestampNotPresentInCommitTableReturnsAnAbort() throws Exception {
TSOClient client = TSOClient.newInstance(tsoClientConf);
TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
long tx1ST = client.getNewStartTimestamp().get();
clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
// Simulate remove entry from the commit table before exercise retry
commitTable.getClient().deleteCommitEntry(tx1ST);
TSOProto.Response response = clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
assertTrue(response.getCommitResponse().getAborted(), "Transaction should abort");
assertEquals(response.getCommitResponse().getCommitTimestamp(), 0);
}
// ----------------------------------------------------------------------------------------------------------------
// The previous 3 tests are similar to the ones in TestRetryProcessor but checking the result on the TSOClient side
// (They exercise the communication protocol) TODO Remove???
// ----------------------------------------------------------------------------------------------------------------
// ----------------------------------------------------------------------------------------------------------------
// Helper methods
// ----------------------------------------------------------------------------------------------------------------
private TSOProto.Request createRetryCommitRequest(long ts) {
return createCommitRequest(ts, true, testWriteSet);
}
private TSOProto.Request createCommitRequest(long ts, boolean retry, Set<CellId> writeSet) {
TSOProto.Request.Builder builder = TSOProto.Request.newBuilder();
TSOProto.CommitRequest.Builder commitBuilder = TSOProto.CommitRequest.newBuilder();
commitBuilder.setStartTimestamp(ts);
commitBuilder.setIsRetry(retry);
for (CellId cell : writeSet) {
commitBuilder.addCellId(cell.getCellId());
}
return builder.setCommitRequest(commitBuilder.build()).build();
}
}