blob: 4d239dc6a2e40ffa8cbdf3d0854ec3e77f14b622 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.openejb.itest.legacy;
import org.apache.openejb.client.Client;
import org.apache.openejb.client.RemoteInitialContextFactory;
import org.apache.openejb.client.event.ClusterMetaDataUpdated;
import org.apache.openejb.client.event.Observes;
import org.apache.openejb.itest.failover.Repository;
import org.apache.openejb.itest.failover.ejb.Calculator;
import org.apache.openejb.loader.Files;
import org.apache.openejb.loader.IO;
import org.apache.openejb.loader.Zips;
import org.apache.openejb.server.control.StandaloneServer;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import javax.ejb.EJBException;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.io.File;
import java.net.URI;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.ConsoleHandler;
import java.util.logging.Level;
import java.util.logging.Logger;
import static org.apache.openejb.util.NetworkUtil.getNextAvailablePort;
public class LegacyClientTest {
private static final Map<String, StandaloneServer> servers = new HashMap<String, StandaloneServer>();
private static StandaloneServer root = null;
private static final String rootname = "root";
private static final File dir = Files.tmpdir();
private static File zip = null;
private static File app = null;
private static final Logger logger = Logger.getLogger("org.apache.openejb.client");
static {
final ConsoleHandler consoleHandler = new ConsoleHandler();
consoleHandler.setLevel(Level.FINER);
logger.addHandler(consoleHandler);
logger.setLevel(Level.FINER);
logger.setUseParentHandlers(false);
}
@BeforeClass
public static void beforeClass() throws Exception {
logger.info("Retrieving standalone server: " + Repository.guessVersion("org.apache.tomee", "openejb-standalone") + " - This may take a while...");
zip = Repository.getArtifact("org.apache.tomee", "openejb-standalone", "zip");
app = Repository.getArtifact("org.apache.openejb.itests", "failover-ejb", "jar");
final File roothome = new File(dir, rootname);
Files.mkdir(roothome);
Zips.unzip(zip, roothome, true);
root = new StandaloneServer(roothome, roothome);
root.killOnExit();
root.getJvmOpts().add("-Dopenejb.classloader.forced-load=org.apache.openejb");
root.ignoreOut();
root.setProperty("name", rootname);
root.setProperty("openejb.extract.configuration", "false");
}
@AfterClass
public static void afterClass() {
for (final Map.Entry<String, StandaloneServer> entry : servers.entrySet()) {
try {
final StandaloneServer server = entry.getValue();
server.kill();
} catch (final Throwable t) {
//Ignore
}
}
if (null != root) {
try {
root.kill();
} catch (final Throwable t) {
//Ignore
}
}
}
@Test
public void test() throws Exception {
// To run in an IDE, uncomment and update this line
// System.setProperty("version", OpenEjbVersion.get().getVersion());
System.setProperty("openejb.client.connection.strategy", "roundrobin");
StandaloneServer.ServerService multipoint = root.getServerService("multipoint");
multipoint.setBind("localhost");
multipoint.setPort(getNextAvailablePort());
multipoint.setDisabled(false);
multipoint.set("discoveryName", rootname);
logger.info("Starting Root server");
root.start();
final Services services = new Services();
Client.addEventObserver(services);
for (final String name : new String[]{"red", "green", "blue"}) {
final File home = new File(dir, name);
Files.mkdir(home);
Zips.unzip(zip, home, true);
final StandaloneServer server = new StandaloneServer(home, home);
server.killOnExit();
server.ignoreOut();
server.setProperty("name", name);
server.setProperty("openejb.extract.configuration", "false");
server.getJvmOpts().add("-Dopenejb.classloader.forced-load=org.apache.openejb");
IO.copy(app, Files.path(home, "apps", "itest.jar"));
IO.copy(IO.read("<openejb><Deployments dir=\"apps/\"/></openejb>"), Files.path(home, "conf", "openejb.xml"));
final StandaloneServer.ServerService ejbd = server.getServerService("ejbd");
ejbd.setBind("localhost");
ejbd.setDisabled(false);
ejbd.setPort(getNextAvailablePort());
ejbd.setThreads(5);
final URI uri = URI.create(String.format("ejbd://%s:%s/%s", ejbd.getBind(), ejbd.getPort(), name));
ejbd.set("discovery", "ejb:" + uri);
services.add(uri);
server.getContext().set(URI.class, uri);
multipoint = server.getServerService("multipoint");
multipoint.setPort(getNextAvailablePort());
multipoint.setDisabled(false);
multipoint.set("discoveryName", name);
multipoint.set("initialServers", "localhost:" + root.getServerService("multipoint").getPort());
servers.put(name, server);
logger.info(String.format("Starting %s server", name));
server.start(1, TimeUnit.MINUTES);
}
System.setProperty("openejb.client.requestretry", "true");
System.setProperty("openejb.client.connection.strategy", "random");
logger.info("Beginning Test");
final Properties environment = new Properties();
environment.put(Context.INITIAL_CONTEXT_FACTORY, RemoteInitialContextFactory.class.getName());
environment.put(Context.PROVIDER_URL, "ejbd://localhost:" + servers.values().iterator().next().getServerService("ejbd").getPort() + "/provider");
final InitialContext context = new InitialContext(environment);
final Calculator bean = (Calculator) context.lookup("CalculatorBeanRemote");
// Lets restart one server. This will change the cluster configuration so when we call
// 'bean' business methods new ClusterMetaDataUpdated event will be triggered
servers.get("red").kill();
servers.get("red").start(1, TimeUnit.MINUTES);
for (final Map.Entry<String, StandaloneServer> entry : servers.entrySet()) {
final String name = entry.getKey();
final StandaloneServer server = entry.getValue();
final URI serverURI = server.getContext().get(URI.class);
logger.info("Waiting for updated list");
services.assertServices(2, TimeUnit.MINUTES, new CalculatorCallable(bean), 1500);
logger.info("Asserting balance");
assertBalance(bean, services.get().size());
logger.info("Shutting down " + name);
server.kill();
services.remove(serverURI);
}
logger.info("All Servers Shutdown");
try {
logger.info("Making one last request, expecting complete failover");
final String name = bean.name();
Assert.fail("Server should be destroyed: " + name);
} catch (final EJBException e) {
logger.info(String.format("Pass. Request resulted in %s: %s", e.getCause().getClass().getSimpleName(), e.getMessage()));
// good
}
}
private static void assertBalance(final Calculator bean, final int size) {
final int expectedInvocations = 1000;
final double percent = 0.10;
final int totalInvocations = size * expectedInvocations;
// Verify the work reached all servers
final Set<Map.Entry<String, AtomicInteger>> entries = invoke(bean, totalInvocations).entrySet();
Assert.assertEquals(size, entries.size());
// And each server got a minimum of %10 percent of the traffic
for (final Map.Entry<String, AtomicInteger> entry : entries) {
final int actualInvocations = entry.getValue().get();
Assert.assertTrue(String.format("%s out of %s is too low", actualInvocations, expectedInvocations), actualInvocations > expectedInvocations * percent);
}
}
private static Map<String, AtomicInteger> invoke(final Calculator bean, final int max) {
final Map<String, AtomicInteger> invocations = new HashMap<String, AtomicInteger>();
for (int i = 0; i < max; i++) {
final String name = bean.name();
if (!invocations.containsKey(name)) {
invocations.put(name, new AtomicInteger());
}
invocations.get(name).incrementAndGet();
}
for (final Map.Entry<String, AtomicInteger> entry : invocations.entrySet()) {
logger.info(String.format("Server %s invoked %s times", entry.getKey(), entry.getValue()));
}
return invocations;
}
public static class Services {
private final ReentrantLock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
private final Set<URI> expected = new HashSet<URI>();
public Services() {
}
public Set<URI> get() {
return expected;
}
public boolean add(final URI uri) {
return expected.add(uri);
}
public boolean remove(final URI o) {
return expected.remove(o);
}
@SuppressWarnings("unused")
public void observe(@Observes final ClusterMetaDataUpdated updated) {
final URI[] locations = updated.getClusterMetaData().getLocations();
final Set<URI> found = new HashSet<URI>(Arrays.asList(locations));
if (expected.equals(found)) {
lock.lock();
try {
condition.signal();
} finally {
lock.unlock();
}
}
}
public Set<URI> diff(final Set<URI> a, final Set<URI> b) {
final Set<URI> diffs = new HashSet<URI>();
for (final URI uri : b) {
if (!a.contains(uri)) {
diffs.add(uri);
}
}
return diffs;
}
public void assertServices(final long timeout, final TimeUnit unit, final Callable callable, final int delay) {
final ClientThread client = new ClientThread(callable);
client.delay(delay);
client.start();
try {
Assert.assertTrue(String.format("services failed to come online: waited %s %s", timeout, unit), await(timeout, unit));
} catch (final InterruptedException e) {
Thread.interrupted();
Assert.fail("Interrupted");
} finally {
client.stop();
}
}
public boolean await(final long timeout, final TimeUnit unit) throws InterruptedException {
lock.lock();
try {
return condition.await(timeout, unit);
} finally {
lock.unlock();
}
}
}
private static class CalculatorCallable implements Callable {
private final Calculator bean;
public CalculatorCallable(final Calculator bean) {
this.bean = bean;
}
@Override
public Object call() throws Exception {
Assert.assertEquals(3, bean.sum(1, 2));
return bean.name();
}
}
}