blob: cf3ec2f5d9775b1175326a2c0f2aad736b523466 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sling.event.it;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.sling.discovery.TopologyEvent;
import org.apache.sling.discovery.TopologyEvent.Type;
import org.apache.sling.discovery.TopologyEventListener;
import org.apache.sling.discovery.TopologyView;
import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.JobManager;
import org.apache.sling.event.jobs.NotificationConstants;
import org.apache.sling.event.jobs.consumer.JobConsumer;
import org.junit.After;
import org.osgi.framework.InvalidSyntaxException;
import org.osgi.framework.ServiceReference;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class AbstractMaxParallelTest extends AbstractJobHandlingTest {
private final Logger log = LoggerFactory.getLogger(this.getClass());
private static final int BACKGROUND_LOAD_DELAY_SECONDS = 1;
private static final int EXTRA_CHAOS_DURATION_SECONDS = 20;
private static final int UNKNOWN_TOPOLOGY_FACTOR_MILLIS = 15;//100;
private static final int STABLE_TOPOLOGY_FACTOR_MILLIS = 40;//300;
static final String TOPIC_PREFIX = "sling/maxparallel/";
static final String TOPIC_NAME = TOPIC_PREFIX + "zero";
private final Object syncObj = new Object();
protected int max = -1;
@Override
long backgroundLoadDelay() {
return BACKGROUND_LOAD_DELAY_SECONDS;
}
@Override
@After
public void cleanup() {
super.cleanup();
}
private void registerMax(int cnt) {
synchronized(syncObj) {
max = Math.max(max, cnt);
}
}
/**
* Setup consumers
*/
private void setupJobConsumers(long jobRunMillis) {
this.registerJobConsumer(TOPIC_NAME,
new JobConsumer() {
private AtomicInteger cnt = new AtomicInteger(0);
@Override
public JobResult process(final Job job) {
int c = cnt.incrementAndGet();
registerMax(c);
log.info("process : start delaying. count=" + c + ", id="+ job.getId());
try {
Thread.sleep(jobRunMillis);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
log.info("process : done delaying. count=" + c + ", id="+ job.getId());
cnt.decrementAndGet();
return JobResult.OK;
}
});
}
private static final class CreateJobThread extends Thread {
private final Logger log = LoggerFactory.getLogger(this.getClass());
private final JobManager jobManager;
final AtomicLong finishedThreads;
private final Map<String, AtomicLong> created;
private final int numJobs;
public CreateJobThread(final JobManager jobManager,
Map<String, AtomicLong> created,
final AtomicLong finishedThreads,
int numJobs) {
this.jobManager = jobManager;
this.created = created;
this.finishedThreads = finishedThreads;
this.numJobs = numJobs;
}
@Override
public void run() {
AtomicInteger cnt = new AtomicInteger(0);
for(int i=0; i<numJobs; i++) {
final int c = cnt.incrementAndGet();
log.info("run: creating job " + c + " on topic " + TOPIC_NAME);
if (jobManager.addJob(TOPIC_NAME, null) != null) {
created.get(TOPIC_NAME).incrementAndGet();
}
}
finishedThreads.incrementAndGet();
}
}
/**
* Setup chaos thread(s)
*
* Chaos is right now created by sending topology changing/changed events randomly
*/
private void setupChaosThreads(final List<Thread> threads,
final AtomicLong finishedThreads, long duration) {
final List<TopologyView> views = new ArrayList<>();
// register topology listener
final ServiceRegistration<TopologyEventListener> reg = this.bc.registerService(TopologyEventListener.class, new TopologyEventListener() {
@Override
public void handleTopologyEvent(final TopologyEvent event) {
if ( event.getType() == Type.TOPOLOGY_INIT ) {
views.add(event.getNewView());
}
}
}, null);
while ( views.isEmpty() ) {
this.sleep(10);
}
reg.unregister();
final TopologyView view = views.get(0);
try {
final Collection<ServiceReference<TopologyEventListener>> refs = this.bc.getServiceReferences(TopologyEventListener.class, null);
assertNotNull(refs);
assertFalse(refs.isEmpty());
TopologyEventListener found = null;
for(final ServiceReference<TopologyEventListener> ref : refs) {
final TopologyEventListener listener = this.bc.getService(ref);
if ( listener != null && listener.getClass().getName().equals("org.apache.sling.event.impl.jobs.config.TopologyHandler") ) {
found = listener;
break;
}
bc.ungetService(ref);
}
assertNotNull(found);
final TopologyEventListener tel = found;
log.info("setupChaosThreads : simulating TOPOLOGY_INIT");
tel.handleTopologyEvent(new TopologyEvent(Type.TOPOLOGY_INIT, null, view));
threads.add(new Thread() {
private final Random random = new Random();
@Override
public void run() {
final long startTime = System.currentTimeMillis();
// this thread runs 30 seconds longer than the job creation thread
final long endTime = startTime + (duration +EXTRA_CHAOS_DURATION_SECONDS) * 1000;
while ( System.currentTimeMillis() < endTime ) {
final int sleepTime = random.nextInt(25) + 15;
try {
Thread.sleep(sleepTime * STABLE_TOPOLOGY_FACTOR_MILLIS);
} catch ( final InterruptedException ie) {
Thread.currentThread().interrupt();
}
log.info("setupChaosThreads : simulating TOPOLOGY_CHANGING");
tel.handleTopologyEvent(new TopologyEvent(Type.TOPOLOGY_CHANGING, view, null));
final int changingTime = random.nextInt(20) + 3;
try {
Thread.sleep(changingTime * UNKNOWN_TOPOLOGY_FACTOR_MILLIS);
} catch ( final InterruptedException ie) {
Thread.currentThread().interrupt();
}
log.info("setupChaosThreads : simulating TOPOLOGY_CHANGED");
tel.handleTopologyEvent(new TopologyEvent(Type.TOPOLOGY_CHANGED, view, view));
}
tel.getClass().getName();
finishedThreads.incrementAndGet();
}
});
} catch (InvalidSyntaxException e) {
e.printStackTrace();
}
}
void doTestMaxParallel(int numJobs, long jobRunMillis, long duration) throws Exception {
final JobManager jobManager = this.getJobManager();
final Map<String, AtomicLong> added = new HashMap<>();
final Map<String, AtomicLong> created = new HashMap<>();
final Map<String, AtomicLong> finished = new HashMap<>();
final List<String> topics = new ArrayList<>();
added.put(TOPIC_NAME, new AtomicLong());
created.put(TOPIC_NAME, new AtomicLong());
finished.put(TOPIC_NAME, new AtomicLong());
topics.add(TOPIC_NAME);
final List<Thread> threads = new ArrayList<>();
final AtomicLong finishedThreads = new AtomicLong();
this.registerEventHandler("org/apache/sling/event/notification/job/*",
new EventHandler() {
@Override
public void handleEvent(final Event event) {
final String topic = (String) event.getProperty(NotificationConstants.NOTIFICATION_PROPERTY_JOB_TOPIC);
if ( NotificationConstants.TOPIC_JOB_FINISHED.equals(event.getTopic())) {
finished.get(topic).incrementAndGet();
} else if ( NotificationConstants.TOPIC_JOB_ADDED.equals(event.getTopic())) {
added.get(topic).incrementAndGet();
}
}
});
// setup job consumers
this.setupJobConsumers(jobRunMillis);
// setup job creation tests
new CreateJobThread(jobManager, created, finishedThreads, numJobs).start();
// wait until 1 job is being processed
log.info("doTestMaxParallel : waiting until 1 job is being processed");
while ( max <= 0 ) {
this.sleep(100);
}
log.info("doTestMaxParallel : 1 job was processed, ready to go. max=" + max);
this.setupChaosThreads(threads, finishedThreads, duration);
log.info("doTestMaxParallel : starting threads (" + threads.size() + ")");
// start threads
for(final Thread t : threads) {
t.setDaemon(true);
t.start();
}
log.info("doTestMaxParallel: sleeping for " + duration + " seconds to wait for threads to finish...");
// for sure we can sleep for the duration
this.sleep(duration * 1000);
log.info("doTestMaxParallel: polling for threads to finish...");
// wait until threads are finished
while ( finishedThreads.get() < threads.size() ) {
this.sleep(100);
}
final Set<String> allTopics = new HashSet<>(topics);
log.info("doTestMaxParallel: waiting for job handling to finish... " + allTopics.size());
while ( !allTopics.isEmpty() ) {
final Iterator<String> iter = allTopics.iterator();
while ( iter.hasNext() ) {
final String topic = iter.next();
log.info("doTestMaxParallel: checking topic= " + topic +
", finished=" + finished.get(topic).get() + ", created=" + created.get(topic).get());
if ( finished.get(topic).get() == created.get(topic).get() ) {
iter.remove();
}
}
log.info("doTestMaxParallel: waiting for job handling to finish... " + allTopics.size());
this.sleep(1000);
}
log.info("doTestMaxParallel: done.");
}
}