blob: ccd9e737b4326d5abcbd2a5a297ba1bb8cd3a2ed [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.fluo.integration.impl;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.oracle.OracleClient;
import org.apache.fluo.core.oracle.OracleServer;
import org.apache.fluo.core.util.HostUtil;
import org.apache.fluo.core.util.PortUtils;
import org.apache.fluo.integration.ITBaseImpl;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class OracleIT extends ITBaseImpl {
private static final String THRIFT_SERVER_LOGGER_NAME =
"org.apache.fluo.core.shaded.thrift.server";
@Rule
public Timeout globalTimeout = Timeout.seconds(getTestTimeout());
private Level curLevel;
@Test
public void testRestart() throws Exception {
OracleClient client = env.getSharedResources().getOracleClient();
long ts1 = client.getStamp().getTxTimestamp();
long ts2 = client.getStamp().getTxTimestamp();
oserver.stop();
oserver.start();
long ts3 = client.getStamp().getTxTimestamp();
long ts4 = client.getStamp().getTxTimestamp();
assertTrue(ts1 + " " + ts2, ts1 < ts2);
assertTrue(ts2 + " " + ts3, ts2 < ts3);
assertTrue(ts3 + " " + ts4, ts3 < ts4);
}
private static class TimestampFetcher implements Runnable {
private int numToGet;
private Environment env;
private List<Long> output;
private CountDownLatch cdl;
TimestampFetcher(int numToGet, Environment env, List<Long> output, CountDownLatch cdl) {
this.numToGet = numToGet;
this.env = env;
this.output = output;
this.cdl = cdl;
}
@Override
public void run() {
OracleClient oclient = env.getSharedResources().getOracleClient();
for (int i = 0; i < numToGet; i++) {
try {
output.add(oclient.getStamp().getTxTimestamp());
} catch (Exception e) {
e.printStackTrace();
}
}
cdl.countDown();
}
}
@Before
public void disableLogger() {
curLevel = Logger.getLogger(THRIFT_SERVER_LOGGER_NAME).getLevel();
Logger.getLogger(THRIFT_SERVER_LOGGER_NAME).setLevel(Level.FATAL);
}
@After
public void enableLogger() {
Logger.getLogger(THRIFT_SERVER_LOGGER_NAME).setLevel(curLevel);
}
/**
* Test that bogus input into the oracle server doesn't cause an OOM exception. This essentially
* tests for THRIFT-602
*/
@Test
public void bogusDataTest() throws Exception {
Socket socket = new Socket();
socket.connect(new InetSocketAddress(HostUtil.getHostName(), oserver.getPort()));
OutputStream outstream = socket.getOutputStream();
try (PrintWriter out = new PrintWriter(outstream)) {
out.print("abcd");
out.flush();
}
socket.close();
OracleClient client = env.getSharedResources().getOracleClient();
assertEquals(2, client.getStamp().getTxTimestamp());
}
@Test
public void threadTest() throws Exception {
int numThreads = 20;
int numTimes = 100;
List<Long> output = Collections.synchronizedList(new ArrayList<Long>());
ExecutorService tpool = Executors.newFixedThreadPool(numThreads);
CountDownLatch cdl = new CountDownLatch(numThreads);
for (int i = 0; i < numThreads; i++) {
tpool.execute(new TimestampFetcher(numTimes, env, output, cdl));
}
cdl.await();
TreeSet<Long> ts1 = new TreeSet<>(output);
assertEquals(numThreads * numTimes, ts1.size());
cdl = new CountDownLatch(numThreads);
output.clear();
for (int i = 0; i < numThreads; i++) {
tpool.execute(new TimestampFetcher(numTimes, env, output, cdl));
}
cdl.await();
TreeSet<Long> ts2 = new TreeSet<>(output);
assertEquals(numThreads * numTimes, ts2.size());
assertTrue(ts1.last() < ts2.first());
tpool.shutdown();
}
/**
* If multiple {@link org.apache.fluo.core.oracle.OracleServer} instances are competing leadership
* and fail, the {@link OracleClient} should failover to them as they go down and serve up new
* blocks of timestamps.
*/
@Test
public void failover_newTimestampRequested() throws Exception {
sleepUntilConnected(oserver);
int port2 = PortUtils.getRandomFreePort();
int port3 = PortUtils.getRandomFreePort();
TestOracle oserver2 = createExtraOracle(port2);
TestOracle oserver3 = createExtraOracle(port3);
oserver2.start();
sleepUntilConnected(oserver2);
oserver3.start();
sleepUntilConnected(oserver3);
OracleClient client = env.getSharedResources().getOracleClient();
long timestamp;
for (long i = 2; i <= 7; i++) {
timestamp = client.getStamp().getTxTimestamp();
assertEquals(i, timestamp);
}
assertTrue(client.getOracle().endsWith(Integer.toString(oserver.getPort())));
oserver.stop();
sleepWhileConnected(oserver);
assertEquals(1002, client.getStamp().getTxTimestamp());
assertTrue(client.getOracle().endsWith(Integer.toString(port2)));
oserver2.stop();
sleepWhileConnected(oserver2);
oserver2.close();
assertEquals(2002, client.getStamp().getTxTimestamp());
assertTrue(client.getOracle().endsWith(Integer.toString(port3)));
oserver3.stop();
oserver3.close();
}
/**
* If an {@link OracleServer} goes away and comes back, the client should automatically reconnect
* and start a new block of timestamps (making sure that no timestamp should ever go backwards).
*/
@Test
public void singleOracle_goesAwayAndComesBack() throws Exception {
sleepUntilConnected(oserver);
OracleClient client = env.getSharedResources().getOracleClient();
long timestamp;
for (long i = 2; i <= 7; i++) {
timestamp = client.getStamp().getTxTimestamp();
assertEquals(i, timestamp);
}
oserver.stop();
sleepWhileConnected(oserver);
while (client.getOracle() != null) {
Thread.sleep(100);
}
assertNull(client.getOracle());
oserver.start();
sleepUntilConnected(oserver);
assertEquals(1002, client.getStamp().getTxTimestamp());
assertTrue(client.getOracle().endsWith(Integer.toString(oserver.getPort())));
oserver.stop();
}
@Test
public void threadFailoverTest() throws Exception {
int numThreads = 20;
int numTimes = 100;
List<Long> output = Collections.synchronizedList(new ArrayList<Long>());
ExecutorService tpool = Executors.newFixedThreadPool(numThreads);
CountDownLatch cdl = new CountDownLatch(numThreads);
int port2 = PortUtils.getRandomFreePort();
int port3 = PortUtils.getRandomFreePort();
TestOracle oserver2 = createExtraOracle(port2);
oserver2.start();
sleepUntilConnected(oserver2);
TestOracle oserver3 = createExtraOracle(port3);
oserver3.start();
sleepUntilConnected(oserver3);
for (int i = 0; i < numThreads; i++) {
tpool.execute(new TimestampFetcher(numTimes, env, output, cdl));
if (i == 10) {
oserver.stop();
}
}
cdl.await();
TreeSet<Long> ts1 = new TreeSet<>(output);
assertEquals(numThreads * numTimes, ts1.size());
cdl = new CountDownLatch(numThreads);
output.clear();
for (int i = 0; i < numThreads; i++) {
tpool.execute(new TimestampFetcher(numTimes, env, output, cdl));
if (i == 5) {
oserver2.stop();
}
}
oserver2.close();
cdl.await();
TreeSet<Long> ts2 = new TreeSet<>(output);
assertEquals(numThreads * numTimes, ts2.size());
assertTrue(ts1.last() < ts2.first());
tpool.shutdown();
oserver3.stop();
oserver3.close();
}
private void sleepWhileConnected(OracleServer oserver) throws InterruptedException {
while (oserver.isConnected()) {
Thread.sleep(100);
}
}
private void sleepUntilConnected(OracleServer oserver) throws InterruptedException {
while (!oserver.isConnected()) {
Thread.sleep(100);
}
}
}