blob: 5c03d852616936a0aee8d671a17301ff7eda6e98 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.felix.dm.itest.api;
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import org.apache.felix.dm.Component;
import org.apache.felix.dm.ComponentStateListener;
import org.apache.felix.dm.ServiceDependency;
import org.apache.felix.dm.itest.util.Ensure;
import org.apache.felix.dm.itest.util.TestBase;
import org.junit.Assert;
/**
* This test class simulates a client having many dependencies being registered concurrently.
* (threads are created manually, and we are not using a ComponentExecutorFactory).
* The services are then unregistered from a single thread (like it is the case when the osgi
* framework is stopped where bunldes are stopped synchronously).
* So, when unbind methods are called, we verify that unbound services are still started at
* the time unbind callbacks are invoked.
*
* @author <a href="mailto:dev@felix.apache.org">Felix Project Team</a>
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public class ServiceRaceWithOrderedUnbindTest extends TestBase {
final static int STEP_WAIT = 5000;
final static int DEPENDENCIES = 10;
final static int LOOPS = 3000;
final Ensure m_done = new Ensure(true);
// Timestamp used to log the time consumed to execute 100 tests.
long m_timeStamp;
public interface Dep {
boolean isStarted();
}
public class DepImpl implements Dep {
volatile boolean m_started;
void start() {
m_started = true;
}
void stop() {
m_started = false;
}
public boolean isStarted() {
return m_started;
}
}
/**
* Creates many service dependencies, and activate/deactivate them concurrently.
*/
public void testCreatesComponentsConcurrently() {
m_dm.add(m_dm.createComponent()
.setImplementation(this)
.setCallbacks(null, "start", null, null));
m_done.waitForStep(1, 60000);
m_dm.clear();
Assert.assertFalse(super.errorsLogged());
}
void start() {
new Thread(this::doStart).start();
}
void doStart() {
info("Starting createParallelComponentRegistgrationUnregistration test");
initThreadPool(); // only if setParallel() has not been called (only if a parallel DM is not used).
try {
m_timeStamp = System.currentTimeMillis();
for (int loop = 0; loop < LOOPS; loop++) {
doTest(loop);
}
}
catch (Throwable t) {
error("got unexpected exception", t);
}
finally {
shutdownThreadPool();
m_done.step(1);
}
}
private void initThreadPool() {
if (! m_parallel) {
// We are not using a parallel DM, so we create a custom threadpool in order to add components concurrently.
int cores = Math.max(16, Runtime.getRuntime().availableProcessors());
info("using " + cores + " cores.");
m_threadPool = new ForkJoinPool(Math.max(cores, DEPENDENCIES + 3 /* start/stop/configure */));
}
}
void shutdownThreadPool() {
if (! m_parallel && m_threadPool != null) {
m_threadPool.shutdown();
try {
m_threadPool.awaitTermination(60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
}
}
}
void doTest(int loop) throws Throwable {
debug("loop#%d -------------------------", loop);
final Ensure step = new Ensure(false);
// Create one client component, which depends on many service dependencies
final Component client = m_dm.createComponent();
final Client clientImpl = new Client(step);
client.setImplementation(clientImpl);
// Before creating the client, register a component listener to check
// the client is really started or deactivated.
ComponentStateListener clientListener = (c, s) -> {
switch(s) {
case TRACKING_OPTIONAL:
step.step(1);
break;
case INACTIVE:
step.step(2);
break;
default:
break;
}
};
client.add(clientListener);
// Create client service dependencies
final ServiceDependency[] dependencies = new ServiceDependency[DEPENDENCIES];
for (int i = 0; i < DEPENDENCIES; i++) {
final String filter = "(id=loop" + loop + "." + i + ")";
dependencies[i] = m_dm.createServiceDependency().setService(Dep.class, filter)
.setRequired(true)
.setCallbacks("add", "remove");
client.add(dependencies[i]);
}
// Activate the client service dependencies concurrently.
List<Component> deps = new ArrayList();
for (int i = 0; i < DEPENDENCIES; i++) {
Hashtable h = new Hashtable();
h.put("id", "loop" + loop + "." + i);
final Component s = m_dm.createComponent()
.setInterface(Dep.class.getName(), h)
.setImplementation(new DepImpl());
deps.add(s);
schedule(() -> m_dm.add(s));
}
// Start the client (concurrently)
schedule(() -> m_dm.add(client));
// Ensure that client has been started.
step.waitForStep(1, STEP_WAIT); // client has entered in TRACKING_OPTIONAL state
Assert.assertEquals(DEPENDENCIES, clientImpl.getDependencies());
// Make sure threadpool is quiescent, then deactivate all components.
if (! m_threadPool.awaitQuiescence(5000, TimeUnit.MILLISECONDS)) {
throw new RuntimeException("Could not start components timely.");
}
// Stop all dependencies, and client
schedule(() -> {
for (Component dep : deps) {
final Component dependency = dep;
m_dm.remove(dependency);
}
m_dm.remove(client);
});
// Ensure that client has been stopped, then destroyed, then unbound from all dependencies
step.waitForStep(2, STEP_WAIT); // Client entered in INACTIVE state
step.ensure();
Assert.assertEquals(0, clientImpl.getDependencies());
// Make sure threadpool is quiescent before doing next iteration.
if (! m_threadPool.awaitQuiescence(5000, TimeUnit.MILLISECONDS)) {
throw new RuntimeException("Could not start components timely.");
}
if (super.errorsLogged()) {
throw new IllegalStateException("Race test interrupted (some error occured, see previous logs)");
}
debug("finished one test loop");
if ((loop + 1) % 100 == 0) {
long duration = System.currentTimeMillis() - m_timeStamp;
warn("Performed 100 tests (total=%d) in %d ms.", (loop + 1), duration);
m_timeStamp = System.currentTimeMillis();
}
}
private void schedule(Runnable task) {
if (! m_parallel) {
// not using parallel DM, so use our custom threadpool.
m_threadPool.execute(task);
} else {
task.run();
}
}
public class Client {
final Ensure m_step;
volatile int m_dependencies;
public Client(Ensure step) {
m_step = step;
}
void add(Dep d) {
Assert.assertNotNull(d);
m_dependencies ++;
}
void remove(Dep d) {
Assert.assertNotNull(d);
if (! d.isStarted()) {
Thread.dumpStack();
}
Assert.assertTrue(d.isStarted());
m_dependencies --;
}
int getDependencies() {
return m_dependencies;
}
}
}